/* Slab mover thread. * Sits waiting for a condition to jump off and shovel some memory about */ staticvoid *slab_rebalance_thread(void *arg) { int was_busy = 0; int backoff_timer = 1; int backoff_max = 1000; /* So we first pass into cond_wait with the mutex held */ mutex_lock(&slabs_rebalance_lock);
/* Must finish moving page before stopping */ // 初始状态:slab_rebalance_signal = 0, do_run_slab_rebalance_thread = 1 while (slab_rebalance_signal || do_run_slab_rebalance_thread) { if (slab_rebalance_signal == 1) { if (slab_rebalance_start() < 0) { /* Handle errors with more specificity as required. */ slab_rebalance_signal = 0; }
if (slab_rebal.done) { slab_rebalance_finish(); } elseif (was_busy) { /* Stuck waiting for some items to unlock, so slow down a bit * to give them a chance to free up */ usleep(backoff_timer); backoff_timer = backoff_timer * 2; if (backoff_timer > backoff_max) backoff_timer = backoff_max; }
if (slab_rebalance_signal == 0) { /* always hold this lock while we're running */ pthread_cond_wait(&slab_rebalance_cond, &slabs_rebalance_lock); } }
staticenum reassign_result_type do_slabs_reassign(int src, int dst) { bool nospare = false; if (slab_rebalance_signal != 0) return REASSIGN_RUNNING;
/* 移动的src和dst不能相同 */ if (src == dst) return REASSIGN_SRC_DST_SAME;
/* 随机选取一个原slab class并将其移动到dst,src不能与dst相同, * 注意: 1.4.14及之后的版本开始支持任意来源的形式,即'slabs reassign -1 15' */ if (src == -1) { src = slabs_reassign_pick_any(dst); /* TODO: If we end up back at -1, return a new error type */ }
/* refcount == 0 is safe since nobody can incr while item_lock is held. * refcount != 0 is impossible since flags/etc can be modified in other * threads. instead, note we found a busy one and bail. logic in do_item_get * will prevent busy items from continuing to be busy * NOTE: This is checking it_flags outside of an item lock. I believe this * works since it_flags is 8 bits, and we're only ever comparing a single bit * regardless. ITEM_SLABBED bit will always be correct since we're holding the * lock which modifies that bit. ITEM_LINKED won't exist if we're between an * item having ITEM_SLABBED removed, and the key hasn't been added to the item * yet. The memory barrier from the slabs lock should order the key write and the * flags to the item? * If ITEM_LINKED did exist and was just removed, but we still see it, that's * still safe since it will have a valid key, which we then lock, and then * recheck everything. * This may not be safe on all platforms; If not, slabs_alloc() will need to * seed the item key while holding slabs_lock. */ staticintslab_rebalance_move(void) { slabclass_t *s_cls; int was_busy = 0; int refcount = 0; uint32_t hv; void *hold_lock; enummove_statusstatus = MOVE_PASS;
s_cls = &slabclass[slab_rebal.s_clsid]; // the offset to check if completed or not int offset = ((char*)slab_rebal.slab_pos-(char*)slab_rebal.slab_start)/(s_cls->size);
// skip acquiring the slabs lock for items we've already fully processed. if (slab_rebal.completed[offset] == 0) { pthread_mutex_lock(&slabs_lock); hv = 0; hold_lock = NULL; item *it = slab_rebal.slab_pos;
item_chunk *ch = NULL; status = MOVE_PASS;
if (it->it_flags & ITEM_CHUNK) { /* This chunk is a chained part of a larger item. */ ch = (item_chunk *) it; /* Instead, we use the head chunk to find the item and effectively * lock the entire structure. If a chunk has ITEM_CHUNK flag, its * head cannot be slabbed, so the normal routine is safe. */ it = ch->head; assert(it->it_flags & ITEM_CHUNKED); }
/* ITEM_FETCHED when ITEM_SLABBED is overloaded to mean we've cleared * the chunk for move. Only these two flags should exist. */ if (it->it_flags != (ITEM_SLABBED|ITEM_FETCHED)) { /* ITEM_SLABBED can only be added/removed under the slabs_lock */ if (it->it_flags & ITEM_SLABBED) { assert(ch == NULL); slab_rebalance_cut_free(s_cls, it); status = MOVE_FROM_SLAB; } elseif ((it->it_flags & ITEM_LINKED) != 0) { /* If it doesn't have ITEM_SLABBED, the item could be in any * state on its way to being freed or written to. If no * ITEM_SLABBED, but it's had ITEM_LINKED, it must be active * and have the key written to it already. */ hv = hash(ITEM_key(it), it->nkey); if ((hold_lock = item_trylock(hv)) == NULL) { status = MOVE_LOCKED; } else { bool is_linked = (it->it_flags & ITEM_LINKED); refcount = refcount_incr(it); if (refcount == 2) { /* item is linked but not busy */ /* Double check ITEM_LINKED flag here, since we're * past a memory barrier from the mutex. */ if (is_linked) { status = MOVE_FROM_LRU; } else { /* refcount == 1 + !ITEM_LINKED means the item is being * uploaded to, or was just unlinked but hasn't been freed * yet. Let it bleed off on its own and try again later */ status = MOVE_BUSY; } } elseif (refcount > 2 && is_linked) { // TODO: Mark items for delete/rescue and process // outside of the main loop. if (slab_rebal.busy_loops > SLAB_MOVE_MAX_LOOPS) { slab_rebal.busy_deletes++; // Only safe to hold slabs lock because refcount // can't drop to 0 until we release item lock. STORAGE_delete(storage, it); pthread_mutex_unlock(&slabs_lock); do_item_unlink(it, hv); pthread_mutex_lock(&slabs_lock); } status = MOVE_BUSY; } else { if (settings.verbose > 2) { fprintf(stderr, "Slab reassign hit a busy item: refcount: %d (%d -> %d)\n", it->refcount, slab_rebal.s_clsid, slab_rebal.d_clsid); } status = MOVE_BUSY; } /* Item lock must be held while modifying refcount */ if (status == MOVE_BUSY) { refcount_decr(it); item_trylock_unlock(hold_lock); } } } else { /* See above comment. No ITEM_SLABBED or ITEM_LINKED. Mark * busy and wait for item to complete its upload. */ status = MOVE_BUSY; } }
int save_item = 0; item *new_it = NULL; size_t ntotal = 0; switch (status) { case MOVE_FROM_LRU: /* Lock order is LRU locks -> slabs_lock. unlink uses LRU lock. * We only need to hold the slabs_lock while initially looking * at an item, and at this point we have an exclusive refcount * (2) + the item is locked. Drop slabs lock, drop item to * refcount 1 (just our own, then fall through and wipe it */ /* Check if expired or flushed */ ntotal = ITEM_ntotal(it); #ifdef EXTSTORE if (it->it_flags & ITEM_HDR) { ntotal = (ntotal - it->nbytes) + sizeof(item_hdr); } #endif /* REQUIRES slabs_lock: CHECK FOR cls->sl_curr > 0 */ if (ch == NULL && (it->it_flags & ITEM_CHUNKED)) { /* Chunked should be identical to non-chunked, except we need * to swap out ntotal for the head-chunk-total. */ ntotal = s_cls->size; } if ((it->exptime != 0 && it->exptime < current_time) || item_is_flushed(it)) { /* Expired, don't save. */ save_item = 0; } elseif (ch == NULL && (new_it = slab_rebalance_alloc(ntotal, slab_rebal.s_clsid)) == NULL) { /* Not a chunk of an item, and nomem. */ save_item = 0; slab_rebal.evictions_nomem++; } elseif (ch != NULL && (new_it = slab_rebalance_alloc(s_cls->size, slab_rebal.s_clsid)) == NULL) { /* Is a chunk of an item, and nomem. */ save_item = 0; slab_rebal.evictions_nomem++; } else { /* Was whatever it was, and we have memory for it. */ save_item = 1; } pthread_mutex_unlock(&slabs_lock); if (save_item) { if (ch == NULL) { assert((new_it->it_flags & ITEM_CHUNKED) == 0); /* if free memory, memcpy. clear prev/next/h_bucket */ memcpy(new_it, it, ntotal); new_it->prev = 0; new_it->next = 0; new_it->h_next = 0; /* These are definitely required. else fails assert */ new_it->it_flags &= ~ITEM_LINKED; new_it->refcount = 0; do_item_replace(it, new_it, hv); /* Need to walk the chunks and repoint head */ if (new_it->it_flags & ITEM_CHUNKED) { item_chunk *fch = (item_chunk *) ITEM_schunk(new_it); fch->next->prev = fch; while (fch) { fch->head = new_it; fch = fch->next; } } it->refcount = 0; it->it_flags = ITEM_SLABBED|ITEM_FETCHED; #ifdef DEBUG_SLAB_MOVER memcpy(ITEM_key(it), "deadbeef", 8); #endif slab_rebal.rescues++; } else { item_chunk *nch = (item_chunk *) new_it; /* Chunks always have head chunk (the main it) */ ch->prev->next = nch; if (ch->next) ch->next->prev = nch; memcpy(nch, ch, ch->used + sizeof(item_chunk)); ch->refcount = 0; ch->it_flags = ITEM_SLABBED|ITEM_FETCHED; slab_rebal.chunk_rescues++; #ifdef DEBUG_SLAB_MOVER memcpy(ITEM_key((item *)ch), "deadbeef", 8); #endif refcount_decr(it); } slab_rebal.completed[offset] = 1; } else { /* unlink and mark as done if it's not * a chunked item as they require more book-keeping) */ STORAGE_delete(storage, it); if (!ch && (it->it_flags & ITEM_CHUNKED) == 0) { do_item_unlink(it, hv); it->it_flags = ITEM_SLABBED|ITEM_FETCHED; it->refcount = 0; #ifdef DEBUG_SLAB_MOVER memcpy(ITEM_key(it), "deadbeef", 8); #endif slab_rebal.completed[offset] = 1; } else { ntotal = ITEM_ntotal(it); do_item_unlink(it, hv); slabs_free(it, ntotal, slab_rebal.s_clsid); /* Swing around again later to remove it from the freelist. */ slab_rebal.busy_items++; was_busy++; }
} item_trylock_unlock(hold_lock); pthread_mutex_lock(&slabs_lock); /* Always remove the ntotal, as we added it in during * do_slabs_alloc() when copying the item. */ break; case MOVE_FROM_SLAB: slab_rebal.completed[offset] = 1; it->refcount = 0; it->it_flags = ITEM_SLABBED|ITEM_FETCHED; #ifdef DEBUG_SLAB_MOVER memcpy(ITEM_key(it), "deadbeef", 8); #endif break; case MOVE_BUSY: case MOVE_LOCKED: slab_rebal.busy_items++; was_busy++; break; case MOVE_PASS: break; }
pthread_mutex_unlock(&slabs_lock); }
// Note: slab_rebal.* is occasionally protected under slabs_lock, but // the mover thread is the only user while active: so it's only necessary // for start/stop synchronization. slab_rebal.slab_pos = (char *)slab_rebal.slab_pos + s_cls->size;
if (slab_rebal.slab_pos >= slab_rebal.slab_end) { /* Some items were busy, start again from the top */ if (slab_rebal.busy_items) { slab_rebal.slab_pos = slab_rebal.slab_start; STATS_LOCK(); stats.slab_reassign_busy_items += slab_rebal.busy_items; STATS_UNLOCK(); slab_rebal.busy_items = 0; slab_rebal.busy_loops++; } else { slab_rebal.done++; } }
return was_busy; }
三、 Twemcache实现方案
3.1、概念简介
随机选取一个slab,然后释放该slab中的所有数据。
3.2、代码实现
选取随机的slab,然后执行剔除:
/* * Get a random slab from all active slabs and evict it for new allocation. * * Note that the slab_table enables us to have O(1) lookup for every slab in * the system. The inserts into the table are just appends - O(1) and there * are no deletes from the slab_table. These two constraints allows us to keep * our random choice uniform. */ staticstruct slab * slab_evict_rand(void) { structslab *slab; uint32_t tries;
tries = SLAB_RAND_MAX_TRIES; do { slab = slab_table_rand(); tries--; } while (tries > 0 && slab->refcount != 0);
if (tries == 0) { /* all randomly chosen slabs are in use */ returnNULL; }
log_debug(LOG_DEBUG, "random-evicting slab %p with id %u", slab, slab->id);
slab_evict_one(slab);
return slab; }
具体执行剔除的代码逻辑:
/* * Evict a slab by evicting all the items within it. This means that the * items that are carved out of the slab must either be deleted from their * a) hash + lru Q, or b) free Q. The candidate slab itself must also be * delinked from its respective slab pool so that it is available for reuse. * * Eviction complexity is O(#items/slab). */ staticvoid slab_evict_one(struct slab *slab) { structslabclass *p; structitem *it; uint32_t i;
p = &slabclass[slab->id];
/* candidate slab is also the current slab */ if (p->free_item != NULL && slab == item_2_slab(p->free_item)) { p->nfree_item = 0; p->free_item = NULL; }
/* delete slab items either from hash + lru Q or free Q */ for (i = 0; i < p->nitem; i++) { it = slab_2_item(slab, i, p->size);