/*- * See the file LICENSE for redistribution information. * * Copyright (c) 1996, 1997, 1998 * Sleepycat Software. All rights reserved. */ /* * Copyright (c) 1990, 1993, 1994 * Margo Seltzer. All rights reserved. */ /* * Copyright (c) 1990, 1993, 1994 * The Regents of the University of California. All rights reserved. * * This code is derived from software contributed to Berkeley by * Margo Seltzer. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. All advertising materials mentioning features or use of this software * must display the following acknowledgement: * This product includes software developed by the University of * California, Berkeley and its contributors. * 4. Neither the name of the University nor the names of its contributors * may be used to endorse or promote products derived from this software * without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ #include "config.h" #ifndef lint static const char sccsid[] = "@(#)hash.c 10.45 (Sleepycat) 5/11/98"; #endif /* not lint */ #ifndef NO_SYSTEM_INCLUDES #include #include #include #include #endif #include "db_int.h" #include "shqueue.h" #include "db_page.h" #include "db_am.h" #include "db_ext.h" #include "hash.h" #include "log.h" static int __ham_c_close __P((DBC *)); static int __ham_c_del __P((DBC *, u_int32_t)); static int __ham_c_get __P((DBC *, DBT *, DBT *, u_int32_t)); static int __ham_c_put __P((DBC *, DBT *, DBT *, u_int32_t)); static int __ham_c_init __P((DB *, DB_TXN *, DBC **)); static int __ham_cursor __P((DB *, DB_TXN *, DBC **)); static int __ham_delete __P((DB *, DB_TXN *, DBT *, u_int32_t)); static int __ham_dup_return __P((HTAB *, HASH_CURSOR *, DBT *, u_int32_t)); static int __ham_get __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); static void __ham_init_htab __P((HTAB *, u_int32_t, u_int32_t)); static int __ham_lookup __P((HTAB *, HASH_CURSOR *, const DBT *, u_int32_t, db_lockmode_t)); static int __ham_overwrite __P((HTAB *, HASH_CURSOR *, DBT *)); static int __ham_put __P((DB *, DB_TXN *, DBT *, DBT *, u_int32_t)); static int __ham_sync __P((DB *, u_int32_t)); /************************** INTERFACE ROUTINES ***************************/ /* OPEN/CLOSE */ /* * __ham_open -- * * PUBLIC: int __ham_open __P((DB *, DB_INFO *)); */ int __ham_open(dbp, dbinfo) DB *dbp; DB_INFO *dbinfo; { DB_ENV *dbenv; DBC *curs; HTAB *hashp; int file_existed, ret; dbenv = dbp->dbenv; if ((hashp = (HTAB *)__db_calloc(1, sizeof(HTAB))) == NULL) return (ENOMEM); hashp->dbp = dbp; /* Set the hash function if specified by the user. */ if (dbinfo != NULL && dbinfo->h_hash != NULL) hashp->hash = dbinfo->h_hash; /* * Initialize the remaining fields of the dbp. The type, close and * fd functions are all set in db_open. */ dbp->internal = hashp; dbp->cursor = __ham_cursor; dbp->del = __ham_delete; dbp->get = __ham_get; dbp->put = __ham_put; dbp->sync = __ham_sync; /* If locking is turned on, lock the meta data page. */ if (F_ISSET(dbp, DB_AM_LOCKING)) { dbp->lock.pgno = BUCKET_INVALID; if ((ret = lock_get(dbenv->lk_info, dbp->locker, 0, &dbp->lock_dbt, DB_LOCK_READ, &hashp->hlock)) != 0) { if (ret < 0) ret = EAGAIN; goto out; } } /* * Now, we can try to read the meta-data page and figure out * if we set up locking and get the meta-data page properly. * If this is a new file, initialize it, and put it back dirty. */ if ((ret = __ham_get_page(hashp->dbp, 0, (PAGE **)&hashp->hdr)) != 0) goto out; /* Initialize the hashp structure */ if (hashp->hdr->magic == DB_HASHMAGIC) { file_existed = 1; /* File exists, verify the data in the header. */ if (hashp->hash == NULL) hashp->hash = hashp->hdr->version < 5 ? __ham_func4 : __ham_func5; if (hashp->hash(CHARKEY, sizeof(CHARKEY)) != hashp->hdr->h_charkey) { __db_err(hashp->dbp->dbenv, "hash: incompatible hash function"); ret = EINVAL; goto out; } if (F_ISSET(hashp->hdr, DB_HASH_DUP)) F_SET(dbp, DB_AM_DUP); } else { /* * File does not exist, we must initialize the header. If * locking is enabled that means getting a write lock first. */ file_existed = 0; if (F_ISSET(dbp, DB_AM_LOCKING) && ((ret = lock_put(dbenv->lk_info, hashp->hlock)) != 0 || (ret = lock_get(dbenv->lk_info, dbp->locker, 0, &dbp->lock_dbt, DB_LOCK_WRITE, &hashp->hlock)) != 0)) { if (ret < 0) ret = EAGAIN; goto out; } __ham_init_htab(hashp, dbinfo != NULL ? dbinfo->h_nelem : 0, dbinfo != NULL ? dbinfo->h_ffactor : 0); if (F_ISSET(dbp, DB_AM_DUP)) F_SET(hashp->hdr, DB_HASH_DUP); if ((ret = __ham_dirty_page(hashp, (PAGE *)hashp->hdr)) != 0) goto out; } /* Initialize the default cursor. */ __ham_c_init(dbp, NULL, &curs); TAILQ_INSERT_TAIL(&dbp->curs_queue, curs, links); /* Allocate memory for our split buffer. */ if ((hashp->split_buf = (PAGE *)__db_malloc(dbp->pgsize)) == NULL) { ret = ENOMEM; goto out; } #ifdef NO_STATISTICS_FOR_DB_ERR __db_err(dbp->dbenv, "%s%lx\n%s%ld\n%s%ld\n%s%ld\n%s%ld\n%s0x%lx\n%s0x%lx\n%s%ld\n%s%ld\n%s0x%lx", "TABLE POINTER ", (long)hashp, "BUCKET SIZE ", (long)hashp->hdr->pagesize, "FILL FACTOR ", (long)hashp->hdr->ffactor, "MAX BUCKET ", (long)hashp->hdr->max_bucket, "OVFL POINT ", (long)hashp->hdr->ovfl_point, "LAST FREED ", (long)hashp->hdr->last_freed, "HIGH MASK ", (long)hashp->hdr->high_mask, "LOW MASK ", (long)hashp->hdr->low_mask, "NELEM ", (long)hashp->hdr->nelem, "FLAGS ", (long)hashp->hdr->flags); #endif /* Release the meta data page */ (void)__ham_put_page(hashp->dbp, (PAGE *)hashp->hdr, 0); if (F_ISSET(dbp, DB_AM_LOCKING) && (ret = lock_put(dbenv->lk_info, hashp->hlock)) != 0) { if (ret < 0) ret = EAGAIN; goto out; } hashp->hlock = 0; hashp->hdr = NULL; /* Sync the file so that we know that the meta data goes to disk. */ if (!file_existed && (ret = dbp->sync(dbp, 0)) != 0) goto out; return (0); out: (void)__ham_close(dbp); return (ret); } /* * PUBLIC: int __ham_close __P((DB *)); */ int __ham_close(dbp) DB *dbp; { HTAB *hashp; int ret, t_ret; DEBUG_LWRITE(dbp, NULL, "ham_close", NULL, NULL, 0); hashp = (HTAB *)dbp->internal; ret = 0; /* Free the split page. */ if (hashp->split_buf) FREE(hashp->split_buf, dbp->pgsize); if (hashp->hdr && (t_ret = __ham_put_page(hashp->dbp, (PAGE *)hashp->hdr, 0)) != 0 && ret == 0) ret = t_ret; if (hashp->hlock && (t_ret = lock_put(hashp->dbp->dbenv->lk_info, hashp->hlock)) != 0 && ret == 0) ret = t_ret; FREE(hashp, sizeof(HTAB)); dbp->internal = NULL; return (ret); } /************************** LOCAL CREATION ROUTINES **********************/ /* * Returns 0 on No Error */ static void __ham_init_htab(hashp, nelem, ffactor) HTAB *hashp; u_int32_t nelem, ffactor; { int32_t l2, nbuckets; memset(hashp->hdr, 0, sizeof(HASHHDR)); hashp->hdr->ffactor = ffactor; hashp->hdr->pagesize = hashp->dbp->pgsize; ZERO_LSN(hashp->hdr->lsn); hashp->hdr->magic = DB_HASHMAGIC; hashp->hdr->version = DB_HASHVERSION; if (hashp->hash == NULL) hashp->hash = hashp->hdr->version < 5 ? __ham_func4 : __ham_func5; hashp->hdr->h_charkey = hashp->hash(CHARKEY, sizeof(CHARKEY)); if (nelem != 0 && hashp->hdr->ffactor != 0) { nelem = (nelem - 1) / hashp->hdr->ffactor + 1; l2 = __db_log2(nelem > 2 ? nelem : 2); } else l2 = 2; nbuckets = 1 << l2; hashp->hdr->ovfl_point = l2; hashp->hdr->last_freed = PGNO_INVALID; hashp->hdr->max_bucket = hashp->hdr->high_mask = nbuckets - 1; hashp->hdr->low_mask = (nbuckets >> 1) - 1; memcpy(hashp->hdr->uid, hashp->dbp->lock.fileid, DB_FILE_ID_LEN); } /********************** DESTROY/CLOSE ROUTINES ************************/ /* * Write modified pages to disk * * Returns: * 0 == OK * -1 ERROR */ static int __ham_sync(dbp, flags) DB *dbp; u_int32_t flags; { int ret; DEBUG_LWRITE(dbp, NULL, "ham_sync", NULL, NULL, flags); if ((ret = __db_syncchk(dbp, flags)) != 0) return (ret); if (F_ISSET(dbp, DB_AM_RDONLY)) return (0); if ((ret = memp_fsync(dbp->mpf)) == DB_INCOMPLETE) ret = 0; return (ret); } /*******************************SEARCH ROUTINES *****************************/ /* * All the access routines return * * Returns: * 0 on SUCCESS * 1 to indicate an external ERROR (i.e. key not found, etc) * -1 to indicate an internal ERROR (i.e. out of memory, etc) */ static int __ham_get(dbp, txn, key, data, flags) DB *dbp; DB_TXN *txn; DBT *key; DBT *data; u_int32_t flags; { DB *ldbp; HTAB *hashp; HASH_CURSOR *hcp; int ret, t_ret; DEBUG_LREAD(dbp, txn, "ham_get", key, NULL, flags); if ((ret = __db_getchk(dbp, key, data, flags)) != 0) return (ret); ldbp = dbp; if (F_ISSET(dbp, DB_AM_THREAD) && (ret = __db_gethandle(dbp, __ham_hdup, &ldbp)) != 0) return (ret); hashp = (HTAB *)ldbp->internal; SET_LOCKER(ldbp, txn); GET_META(ldbp, hashp); hashp->hash_accesses++; hcp = (HASH_CURSOR *)TAILQ_FIRST(&ldbp->curs_queue)->internal; if ((ret = __ham_lookup(hashp, hcp, key, 0, DB_LOCK_READ)) == 0) if (F_ISSET(hcp, H_OK)) ret = __ham_dup_return(hashp, hcp, data, DB_FIRST); else /* Key was not found */ ret = DB_NOTFOUND; if ((t_ret = __ham_item_done(hashp, hcp, 0)) != 0 && ret == 0) ret = t_ret; RELEASE_META(ldbp, hashp); if (F_ISSET(dbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } static int __ham_put(dbp, txn, key, data, flags) DB *dbp; DB_TXN *txn; DBT *key; DBT *data; u_int32_t flags; { DB *ldbp; DBT tmp_val, *myval; HASH_CURSOR *hcp; HTAB *hashp; u_int32_t nbytes; int ret, t_ret; DEBUG_LWRITE(dbp, txn, "ham_put", key, data, flags); if ((ret = __db_putchk(dbp, key, data, flags, F_ISSET(dbp, DB_AM_RDONLY), F_ISSET(dbp, DB_AM_DUP))) != 0) return (ret); ldbp = dbp; if (F_ISSET(dbp, DB_AM_THREAD) && (ret = __db_gethandle(dbp, __ham_hdup, &ldbp)) != 0) return (ret); hashp = (HTAB *)ldbp->internal; SET_LOCKER(ldbp, txn); GET_META(ldbp, hashp); hcp = TAILQ_FIRST(&ldbp->curs_queue)->internal; nbytes = (ISBIG(hashp, key->size) ? HOFFPAGE_PSIZE : HKEYDATA_PSIZE(key->size)) + (ISBIG(hashp, data->size) ? HOFFPAGE_PSIZE : HKEYDATA_PSIZE(data->size)); hashp->hash_accesses++; ret = __ham_lookup(hashp, hcp, key, nbytes, DB_LOCK_WRITE); if (ret == DB_NOTFOUND) { ret = 0; if (hcp->seek_found_page != PGNO_INVALID && hcp->seek_found_page != hcp->pgno) { if ((ret = __ham_item_done(hashp, hcp, 0)) != 0) goto out; hcp->pgno = hcp->seek_found_page; hcp->bndx = NDX_INVALID; } if (F_ISSET(data, DB_DBT_PARTIAL) && data->doff != 0) { /* * Doing a partial put, but the key does not exist * and we are not beginning the write at 0. We * must create a data item padded up to doff and * then write the new bytes represented by val. */ ret = __ham_init_dbt(&tmp_val, data->size + data->doff, &hcp->big_data, &hcp->big_datalen); if (ret == 0) { memset(tmp_val.data, 0, data->doff); memcpy((u_int8_t *)tmp_val.data + data->doff, data->data, data->size); myval = &tmp_val; } } else myval = (DBT *)data; if (ret == 0) ret = __ham_add_el(hashp, hcp, key, myval, H_KEYDATA); } else if (ret == 0 && F_ISSET(hcp, H_OK)) { if (flags == DB_NOOVERWRITE) ret = DB_KEYEXIST; else if (F_ISSET(ldbp, DB_AM_DUP)) ret = __ham_add_dup(hashp, hcp, data, DB_KEYLAST); else ret = __ham_overwrite(hashp, hcp, data); } /* Free up all the cursor pages. */ if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0) ret = t_ret; /* Now check if we have to grow. */ out: if (ret == 0 && F_ISSET(hcp, H_EXPAND)) { ret = __ham_expand_table(hashp); F_CLR(hcp, H_EXPAND); } if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0) ret = t_ret; RELEASE_META(ldbp, hashp); if (F_ISSET(dbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } static int __ham_cursor(dbp, txnid, dbcp) DB *dbp; DB_TXN *txnid; DBC **dbcp; { int ret; DEBUG_LWRITE(dbp, txnid, "ham_cursor", NULL, NULL, 0); if ((ret = __ham_c_init(dbp, txnid, dbcp)) != 0) return (ret); DB_THREAD_LOCK(dbp); TAILQ_INSERT_TAIL(&dbp->curs_queue, *dbcp, links); DB_THREAD_UNLOCK(dbp); return (ret); } static int __ham_c_init(dbp, txnid, dbcp) DB *dbp; DB_TXN *txnid; DBC **dbcp; { DBC *db_curs; HASH_CURSOR *new_curs; if ((db_curs = (DBC *)__db_calloc(sizeof(DBC), 1)) == NULL) return (ENOMEM); if ((new_curs = (HASH_CURSOR *)__db_calloc(sizeof(struct cursor_t), 1)) == NULL) { FREE(db_curs, sizeof(DBC)); return (ENOMEM); } db_curs->internal = new_curs; db_curs->c_close = __ham_c_close; db_curs->c_del = __ham_c_del; db_curs->c_get = __ham_c_get; db_curs->c_put = __ham_c_put; db_curs->txn = txnid; db_curs->dbp = dbp; new_curs->db_cursor = db_curs; __ham_item_init(new_curs); if (dbcp != NULL) *dbcp = db_curs; return (0); } static int __ham_delete(dbp, txn, key, flags) DB *dbp; DB_TXN *txn; DBT *key; u_int32_t flags; { DB *ldbp; HTAB *hashp; HASH_CURSOR *hcp; int ret, t_ret; DEBUG_LWRITE(dbp, txn, "ham_delete", key, NULL, flags); if ((ret = __db_delchk(dbp, key, flags, F_ISSET(dbp, DB_AM_RDONLY))) != 0) return (ret); ldbp = dbp; if (F_ISSET(dbp, DB_AM_THREAD) && (ret = __db_gethandle(dbp, __ham_hdup, &ldbp)) != 0) return (ret); hashp = (HTAB *)ldbp->internal; SET_LOCKER(ldbp, txn); GET_META(ldbp, hashp); hcp = TAILQ_FIRST(&ldbp->curs_queue)->internal; hashp->hash_accesses++; if ((ret = __ham_lookup(hashp, hcp, key, 0, DB_LOCK_WRITE)) == 0) if (F_ISSET(hcp, H_OK)) ret = __ham_del_pair(hashp, hcp, 1); else ret = DB_NOTFOUND; if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0) ret = t_ret; RELEASE_META(ldbp, hashp); if (F_ISSET(dbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } /* ****************** CURSORS ********************************** */ static int __ham_c_close(cursor) DBC *cursor; { DB *ldbp; int ret; DEBUG_LWRITE(cursor->dbp, cursor->txn, "ham_c_close", NULL, NULL, 0); /* * If the pagep, dpagep, and lock fields of the cursor are all NULL, * then there really isn't a need to get a handle here. However, * the normal case is that at least one of those fields is non-NULL, * and putting those checks in here would couple the ham_item_done * functionality with cursor close which would be pretty disgusting. * Instead, we pay the overhead here of always getting the handle. */ ldbp = cursor->dbp; if (F_ISSET(cursor->dbp, DB_AM_THREAD) && (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0) return (ret); ret = __ham_c_iclose(ldbp, cursor); if (F_ISSET(ldbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } /* * __ham_c_iclose -- * * Internal cursor close routine; assumes it is being passed the correct * handle, rather than getting and putting a handle. * * PUBLIC: int __ham_c_iclose __P((DB *, DBC *)); */ int __ham_c_iclose(dbp, dbc) DB *dbp; DBC *dbc; { HASH_CURSOR *hcp; HTAB *hashp; int ret; hashp = (HTAB *)dbp->internal; hcp = (HASH_CURSOR *)dbc->internal; ret = __ham_item_done(hashp, hcp, 0); if (hcp->big_key) FREE(hcp->big_key, hcp->big_keylen); if (hcp->big_data) FREE(hcp->big_data, hcp->big_datalen); /* * All cursors (except the default ones) are linked off the master. * Therefore, when we close the cursor, we have to remove it from * the master, not the local one. * XXX I am always removing from the master; what about local cursors? */ DB_THREAD_LOCK(dbc->dbp); TAILQ_REMOVE(&dbc->dbp->curs_queue, dbc, links); DB_THREAD_UNLOCK(dbc->dbp); FREE(hcp, sizeof(HASH_CURSOR)); FREE(dbc, sizeof(DBC)); return (ret); } static int __ham_c_del(cursor, flags) DBC *cursor; u_int32_t flags; { DB *ldbp; HASH_CURSOR *hcp; HASH_CURSOR save_curs; HTAB *hashp; db_pgno_t ppgno, chg_pgno; int ret, t_ret; DEBUG_LWRITE(cursor->dbp, cursor->txn, "ham_c_del", NULL, NULL, flags); ldbp = cursor->dbp; if (F_ISSET(cursor->dbp, DB_AM_THREAD) && (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0) return (ret); hashp = (HTAB *)ldbp->internal; hcp = (HASH_CURSOR *)cursor->internal; save_curs = *hcp; if ((ret = __db_cdelchk(ldbp, flags, F_ISSET(ldbp, DB_AM_RDONLY), IS_VALID(hcp))) != 0) return (ret); if (F_ISSET(hcp, H_DELETED)) return (DB_NOTFOUND); SET_LOCKER(hashp->dbp, cursor->txn); GET_META(hashp->dbp, hashp); hashp->hash_accesses++; if ((ret = __ham_get_cpage(hashp, hcp, DB_LOCK_WRITE)) != 0) goto out; if (F_ISSET(hcp, H_ISDUP) && hcp->dpgno != PGNO_INVALID) { /* * We are about to remove a duplicate from offpage. * * There are 4 cases. * 1. We will remove an item on a page, but there are more * items on that page. * 2. We will remove the last item on a page, but there is a * following page of duplicates. * 3. We will remove the last item on a page, this page was the * last page in a duplicate set, but there were dups before * it. * 4. We will remove the last item on a page, removing the last * duplicate. * In case 1 hcp->dpagep is unchanged. * In case 2 hcp->dpagep comes back pointing to the next dup * page. * In case 3 hcp->dpagep comes back NULL. * In case 4 hcp->dpagep comes back NULL. * * Case 4 results in deleting the pair off the master page. * The normal code for doing this knows how to delete the * duplicates, so we will handle this case in the normal code. */ ppgno = PREV_PGNO(hcp->dpagep); if (ppgno == PGNO_INVALID && NEXT_PGNO(hcp->dpagep) == PGNO_INVALID && NUM_ENT(hcp->dpagep) == 1) goto normal; /* Remove item from duplicate page. */ chg_pgno = hcp->dpgno; if ((ret = __db_drem(hashp->dbp, &hcp->dpagep, hcp->dndx, __ham_del_page)) != 0) goto out; if (hcp->dpagep == NULL) { if (ppgno != PGNO_INVALID) { /* Case 3 */ hcp->dpgno = ppgno; if ((ret = __ham_get_cpage(hashp, hcp, DB_LOCK_READ)) != 0) goto out; hcp->dndx = NUM_ENT(hcp->dpagep); F_SET(hcp, H_DELETED); } else { /* Case 4 */ ret = __ham_del_pair(hashp, hcp, 1); hcp->dpgno = PGNO_INVALID; /* * Delpair updated the cursor queue, so we * don't have to do that here. */ chg_pgno = PGNO_INVALID; } } else if (PGNO(hcp->dpagep) != hcp->dpgno) { hcp->dndx = 0; /* Case 2 */ hcp->dpgno = PGNO(hcp->dpagep); if (ppgno == PGNO_INVALID) memcpy(HOFFDUP_PGNO(P_ENTRY(hcp->pagep, H_DATAINDEX(hcp->bndx))), &hcp->dpgno, sizeof(db_pgno_t)); F_SET(hcp, H_DELETED); } else /* Case 1 */ F_SET(hcp, H_DELETED); if (chg_pgno != PGNO_INVALID) __ham_c_update(hcp, chg_pgno, 0, 0, 1); } else if (F_ISSET(hcp, H_ISDUP)) { /* on page */ if (hcp->dup_off == 0 && DUP_SIZE(hcp->dup_len) == LEN_HDATA(hcp->pagep, hashp->hdr->pagesize, hcp->bndx)) ret = __ham_del_pair(hashp, hcp, 1); else { DBT repldbt; repldbt.flags = 0; F_SET(&repldbt, DB_DBT_PARTIAL); repldbt.doff = hcp->dup_off; repldbt.dlen = DUP_SIZE(hcp->dup_len); repldbt.size = 0; ret = __ham_replpair(hashp, hcp, &repldbt, 0); hcp->dup_tlen -= DUP_SIZE(hcp->dup_len); F_SET(hcp, H_DELETED); __ham_c_update(hcp, hcp->pgno, DUP_SIZE(hcp->dup_len), 0, 1); } } else /* Not a duplicate */ normal: ret = __ham_del_pair(hashp, hcp, 1); out: if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0) ret = t_ret; if (ret != 0) *hcp = save_curs; RELEASE_META(hashp->dbp, hashp); if (F_ISSET(cursor->dbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } static int __ham_c_get(cursor, key, data, flags) DBC *cursor; DBT *key; DBT *data; u_int32_t flags; { DB *ldbp; HTAB *hashp; HASH_CURSOR *hcp, save_curs; int get_key, ret, t_ret; DEBUG_LREAD(cursor->dbp, cursor->txn, "ham_c_get", flags == DB_SET || flags == DB_SET_RANGE ? key : NULL, NULL, flags); ldbp = cursor->dbp; if (F_ISSET(cursor->dbp, DB_AM_THREAD) && (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0) return (ret); hashp = (HTAB *)(ldbp->internal); hcp = (HASH_CURSOR *)cursor->internal; save_curs = *hcp; if ((ret = __db_cgetchk(hashp->dbp, key, data, flags, IS_VALID(hcp))) != 0) return (ret); SET_LOCKER(hashp->dbp, cursor->txn); GET_META(hashp->dbp, hashp); hashp->hash_accesses++; hcp->seek_size = 0; ret = 0; get_key = 1; switch (flags) { case DB_PREV: if (hcp->bucket != BUCKET_INVALID) { ret = __ham_item_prev(hashp, hcp, DB_LOCK_READ); break; } /* FALLTHROUGH */ case DB_LAST: ret = __ham_item_last(hashp, hcp, DB_LOCK_READ); break; case DB_FIRST: ret = __ham_item_first(hashp, hcp, DB_LOCK_READ); break; case DB_NEXT: if (hcp->bucket == BUCKET_INVALID) hcp->bucket = 0; ret = __ham_item_next(hashp, hcp, DB_LOCK_READ); break; case DB_SET: case DB_SET_RANGE: ret = __ham_lookup(hashp, hcp, key, 0, DB_LOCK_READ); get_key = 0; break; case DB_CURRENT: if (F_ISSET(hcp, H_DELETED)) { ret = DB_KEYEMPTY; goto out; } ret = __ham_item(hashp, hcp, DB_LOCK_READ); break; } /* * Must always enter this loop to do error handling and * check for big key/data pair. */ while (1) { if (ret != 0 && ret != DB_NOTFOUND) goto out1; else if (F_ISSET(hcp, H_OK)) { /* Get the key. */ if (get_key && (ret = __db_ret(hashp->dbp, hcp->pagep, H_KEYINDEX(hcp->bndx), key, &hcp->big_key, &hcp->big_keylen)) != 0) goto out1; ret = __ham_dup_return(hashp, hcp, data, flags); break; } else if (!F_ISSET(hcp, H_NOMORE)) { abort(); break; } /* * Ran out of entries in a bucket; change buckets. */ switch (flags) { case DB_LAST: case DB_PREV: ret = __ham_item_done(hashp, hcp, 0); if (hcp->bucket == 0) { ret = DB_NOTFOUND; goto out1; } hcp->bucket--; hcp->bndx = NDX_INVALID; if (ret == 0) ret = __ham_item_prev(hashp, hcp, DB_LOCK_READ); break; case DB_FIRST: case DB_NEXT: ret = __ham_item_done(hashp, hcp, 0); hcp->bndx = NDX_INVALID; hcp->bucket++; hcp->pgno = PGNO_INVALID; hcp->pagep = NULL; if (hcp->bucket > hashp->hdr->max_bucket) { ret = DB_NOTFOUND; goto out1; } if (ret == 0) ret = __ham_item_next(hashp, hcp, DB_LOCK_READ); break; case DB_SET: case DB_SET_RANGE: /* Key not found. */ ret = DB_NOTFOUND; goto out1; } } out1: if ((t_ret = __ham_item_done(hashp, hcp, 0)) != 0 && ret == 0) ret = t_ret; out: if (ret) *hcp = save_curs; RELEASE_META(hashp->dbp, hashp); if (F_ISSET(cursor->dbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } static int __ham_c_put(cursor, key, data, flags) DBC *cursor; DBT *key; DBT *data; u_int32_t flags; { DB *ldbp; HASH_CURSOR *hcp, save_curs; HTAB *hashp; u_int32_t nbytes; int ret, t_ret; DEBUG_LWRITE(cursor->dbp, cursor->txn, "ham_c_put", flags == DB_KEYFIRST || flags == DB_KEYLAST ? key : NULL, data, flags); ldbp = cursor->dbp; if (F_ISSET(cursor->dbp, DB_AM_THREAD) && (ret = __db_gethandle(cursor->dbp, __ham_hdup, &ldbp)) != 0) return (ret); hashp = (HTAB *)(ldbp->internal); hcp = (HASH_CURSOR *)cursor->internal; save_curs = *hcp; if ((ret = __db_cputchk(hashp->dbp, key, data, flags, F_ISSET(ldbp, DB_AM_RDONLY), IS_VALID(hcp))) != 0) return (ret); if (F_ISSET(hcp, H_DELETED)) return (DB_NOTFOUND); SET_LOCKER(hashp->dbp, cursor->txn); GET_META(hashp->dbp, hashp); ret = 0; switch (flags) { case DB_KEYLAST: case DB_KEYFIRST: nbytes = (ISBIG(hashp, key->size) ? HOFFPAGE_PSIZE : HKEYDATA_PSIZE(key->size)) + (ISBIG(hashp, data->size) ? HOFFPAGE_PSIZE : HKEYDATA_PSIZE(data->size)); ret = __ham_lookup(hashp, hcp, key, nbytes, DB_LOCK_WRITE); break; case DB_BEFORE: case DB_AFTER: case DB_CURRENT: ret = __ham_item(hashp, hcp, DB_LOCK_WRITE); break; } if (ret == 0) { if (flags == DB_CURRENT && !F_ISSET(ldbp, DB_AM_DUP)) ret = __ham_overwrite(hashp, hcp, data); else ret = __ham_add_dup(hashp, hcp, data, flags); } if (ret == 0 && F_ISSET(hcp, H_EXPAND)) { ret = __ham_expand_table(hashp); F_CLR(hcp, H_EXPAND); } if ((t_ret = __ham_item_done(hashp, hcp, ret == 0)) != 0 && ret == 0) ret = t_ret; if (ret != 0) *hcp = save_curs; RELEASE_META(hashp->dbp, hashp); if (F_ISSET(cursor->dbp, DB_AM_THREAD)) __db_puthandle(ldbp); return (ret); } /********************************* UTILITIES ************************/ /* * __ham_expand_table -- * * PUBLIC: int __ham_expand_table __P((HTAB *)); */ int __ham_expand_table(hashp) HTAB *hashp; { DB_LSN new_lsn; u_int32_t old_bucket, new_bucket, spare_ndx; int ret; ret = 0; DIRTY_META(hashp, ret); if (ret) return (ret); /* * If the split point is about to increase, make sure that we * have enough extra pages. The calculation here is weird. * We'd like to do this after we've upped max_bucket, but it's * too late then because we've logged the meta-data split. What * we'll do between then and now is increment max bucket and then * see what the log of one greater than that is; here we have to * look at the log of max + 2. VERY NASTY STUFF. */ if (__db_log2(hashp->hdr->max_bucket + 2) > hashp->hdr->ovfl_point) { /* * We are about to shift the split point. Make sure that * if the next doubling is going to be big (more than 8 * pages), we have some extra pages around. */ if (hashp->hdr->max_bucket + 1 >= 8 && hashp->hdr->spares[hashp->hdr->ovfl_point] < hashp->hdr->spares[hashp->hdr->ovfl_point - 1] + hashp->hdr->ovfl_point + 1) __ham_init_ovflpages(hashp); } /* Now we can log the meta-data split. */ if (DB_LOGGING(hashp->dbp)) { if ((ret = __ham_splitmeta_log(hashp->dbp->dbenv->lg_info, (DB_TXN *)hashp->dbp->txn, &new_lsn, 0, hashp->dbp->log_fileid, hashp->hdr->max_bucket, hashp->hdr->ovfl_point, hashp->hdr->spares[hashp->hdr->ovfl_point], &hashp->hdr->lsn)) != 0) return (ret); hashp->hdr->lsn = new_lsn; } hashp->hash_expansions++; new_bucket = ++hashp->hdr->max_bucket; old_bucket = (hashp->hdr->max_bucket & hashp->hdr->low_mask); /* * If the split point is increasing, copy the current contents * of the spare split bucket to the next bucket. */ spare_ndx = __db_log2(hashp->hdr->max_bucket + 1); if (spare_ndx > hashp->hdr->ovfl_point) { hashp->hdr->spares[spare_ndx] = hashp->hdr->spares[hashp->hdr->ovfl_point]; hashp->hdr->ovfl_point = spare_ndx; } if (new_bucket > hashp->hdr->high_mask) { /* Starting a new doubling */ hashp->hdr->low_mask = hashp->hdr->high_mask; hashp->hdr->high_mask = new_bucket | hashp->hdr->low_mask; } if (BUCKET_TO_PAGE(hashp, new_bucket) > MAX_PAGES(hashp)) { __db_err(hashp->dbp->dbenv, "hash: Cannot allocate new bucket. Pages exhausted."); return (ENOSPC); } /* Relocate records to the new bucket */ return (__ham_split_page(hashp, old_bucket, new_bucket)); } /* * PUBLIC: u_int32_t __ham_call_hash __P((HTAB *, u_int8_t *, int32_t)); */ u_int32_t __ham_call_hash(hashp, k, len) HTAB *hashp; u_int8_t *k; int32_t len; { u_int32_t n, bucket; n = (u_int32_t)hashp->hash(k, len); bucket = n & hashp->hdr->high_mask; if (bucket > hashp->hdr->max_bucket) bucket = bucket & hashp->hdr->low_mask; return (bucket); } /* * Check for duplicates, and call __db_ret appropriately. Release * everything held by the cursor. */ static int __ham_dup_return(hashp, hcp, val, flags) HTAB *hashp; HASH_CURSOR *hcp; DBT *val; u_int32_t flags; { PAGE *pp; DBT *myval, tmp_val; db_indx_t ndx; db_pgno_t pgno; u_int8_t *hk, type; int ret; db_indx_t len; /* Check for duplicate and return the first one. */ ndx = H_DATAINDEX(hcp->bndx); type = HPAGE_TYPE(hcp->pagep, ndx); pp = hcp->pagep; myval = val; /* * There are 3 cases: * 1. We are not in duplicate, simply call db_ret. * 2. We are looking at keys and stumbled onto a duplicate. * 3. We are in the middle of a duplicate set. (ISDUP set) */ /* * Here we check for the case where we just stumbled onto a * duplicate. In this case, we do initialization and then * let the normal duplicate code handle it. */ if (!F_ISSET(hcp, H_ISDUP)) if (type == H_DUPLICATE) { F_SET(hcp, H_ISDUP); hcp->dup_tlen = LEN_HDATA(hcp->pagep, hashp->hdr->pagesize, hcp->bndx); hk = H_PAIRDATA(hcp->pagep, hcp->bndx); if (flags == DB_LAST || flags == DB_PREV) { hcp->dndx = 0; hcp->dup_off = 0; do { memcpy(&len, HKEYDATA_DATA(hk) + hcp->dup_off, sizeof(db_indx_t)); hcp->dup_off += DUP_SIZE(len); hcp->dndx++; } while (hcp->dup_off < hcp->dup_tlen); hcp->dup_off -= DUP_SIZE(len); hcp->dndx--; } else { memcpy(&len, HKEYDATA_DATA(hk), sizeof(db_indx_t)); hcp->dup_off = 0; hcp->dndx = 0; } hcp->dup_len = len; } else if (type == H_OFFDUP) { F_SET(hcp, H_ISDUP); memcpy(&pgno, HOFFDUP_PGNO(P_ENTRY(hcp->pagep, ndx)), sizeof(db_pgno_t)); if (flags == DB_LAST || flags == DB_PREV) { if ((ret = __db_dend(hashp->dbp, pgno, &hcp->dpagep)) != 0) return (ret); hcp->dpgno = PGNO(hcp->dpagep); hcp->dndx = NUM_ENT(hcp->dpagep) - 1; } else if ((ret = __ham_next_cpage(hashp, hcp, pgno, 0, H_ISDUP)) != 0) return (ret); } /* * Now, everything is initialized, grab a duplicate if * necessary. */ if (F_ISSET(hcp, H_ISDUP)) if (hcp->dpgno != PGNO_INVALID) { pp = hcp->dpagep; ndx = hcp->dndx; } else { /* * Copy the DBT in case we are retrieving into * user memory and we need the parameters for * it. */ memcpy(&tmp_val, val, sizeof(*val)); F_SET(&tmp_val, DB_DBT_PARTIAL); tmp_val.dlen = hcp->dup_len; tmp_val.doff = hcp->dup_off + sizeof(db_indx_t); myval = &tmp_val; } /* * Finally, if we had a duplicate, pp, ndx, and myval should be * set appropriately. */ if ((ret = __db_ret(hashp->dbp, pp, ndx, myval, &hcp->big_data, &hcp->big_datalen)) != 0) return (ret); /* * In case we sent a temporary off to db_ret, set the real * return values. */ val->data = myval->data; val->size = myval->size; return (0); } static int __ham_overwrite(hashp, hcp, nval) HTAB *hashp; HASH_CURSOR *hcp; DBT *nval; { DBT *myval, tmp_val; u_int8_t *hk; if (F_ISSET(hashp->dbp, DB_AM_DUP)) return (__ham_add_dup(hashp, hcp, nval, DB_KEYLAST)); else if (!F_ISSET(nval, DB_DBT_PARTIAL)) { /* Put/overwrite */ memcpy(&tmp_val, nval, sizeof(*nval)); F_SET(&tmp_val, DB_DBT_PARTIAL); tmp_val.doff = 0; hk = H_PAIRDATA(hcp->pagep, hcp->bndx); if (HPAGE_PTYPE(hk) == H_OFFPAGE) memcpy(&tmp_val.dlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); else tmp_val.dlen = LEN_HDATA(hcp->pagep, hashp->hdr->pagesize,hcp->bndx); myval = &tmp_val; } else /* Regular partial put */ myval = nval; return (__ham_replpair(hashp, hcp, myval, 0)); } /* * Given a key and a cursor, sets the cursor to the page/ndx on which * the key resides. If the key is found, the cursor H_OK flag is set * and the pagep, bndx, pgno (dpagep, dndx, dpgno) fields are set. * If the key is not found, the H_OK flag is not set. If the sought * field is non-0, the pagep, bndx, pgno (dpagep, dndx, dpgno) fields * are set indicating where an add might take place. If it is 0, * non of the cursor pointer field are valid. */ static int __ham_lookup(hashp, hcp, key, sought, mode) HTAB *hashp; HASH_CURSOR *hcp; const DBT *key; u_int32_t sought; db_lockmode_t mode; { db_pgno_t pgno; u_int32_t tlen; int match, ret, t_ret; u_int8_t *hk; /* * Set up cursor so that we're looking for space to add an item * as we cycle through the pages looking for the key. */ if ((ret = __ham_item_reset(hashp, hcp)) != 0) return (ret); hcp->seek_size = sought; hcp->bucket = __ham_call_hash(hashp, (u_int8_t *)key->data, key->size); while (1) { if ((ret = __ham_item_next(hashp, hcp, mode)) != 0) return (ret); if (F_ISSET(hcp, H_NOMORE)) break; hk = H_PAIRKEY(hcp->pagep, hcp->bndx); switch (HPAGE_PTYPE(hk)) { case H_OFFPAGE: memcpy(&tlen, HOFFPAGE_TLEN(hk), sizeof(u_int32_t)); if (tlen == key->size) { memcpy(&pgno, HOFFPAGE_PGNO(hk), sizeof(db_pgno_t)); match = __db_moff(hashp->dbp, key, pgno); if (match == 0) { F_SET(hcp, H_OK); return (0); } } break; case H_KEYDATA: if (key->size == LEN_HKEY(hcp->pagep, hashp->hdr->pagesize, hcp->bndx) && memcmp(key->data, HKEYDATA_DATA(hk), key->size) == 0) { F_SET(hcp, H_OK); return (0); } break; case H_DUPLICATE: case H_OFFDUP: /* * These are errors because keys are never * duplicated, only data items are. */ return (__db_pgfmt(hashp->dbp, PGNO(hcp->pagep))); } hashp->hash_collisions++; } /* * Item was not found, adjust cursor properly. */ if (sought != 0) return (ret); if ((t_ret = __ham_item_done(hashp, hcp, 0)) != 0 && ret == 0) ret = t_ret; return (ret); } /* * Initialize a dbt using some possibly already allocated storage * for items. * PUBLIC: int __ham_init_dbt __P((DBT *, u_int32_t, void **, u_int32_t *)); */ int __ham_init_dbt(dbt, size, bufp, sizep) DBT *dbt; u_int32_t size; void **bufp; u_int32_t *sizep; { memset(dbt, 0, sizeof(*dbt)); if (*sizep < size) { if ((*bufp = (void *)(*bufp == NULL ? __db_malloc(size) : __db_realloc(*bufp, size))) == NULL) { *sizep = 0; return (ENOMEM); } *sizep = size; } dbt->data = *bufp; dbt->size = size; return (0); } /* * Adjust the cursor after an insert or delete. The cursor passed is * the one that was operated upon; we just need to check any of the * others. * * len indicates the length of the item added/deleted * add indicates if the item indicated by the cursor has just been * added (add == 1) or deleted (add == 0). * dup indicates if the addition occurred into a duplicate set. * * PUBLIC: void __ham_c_update * PUBLIC: __P((HASH_CURSOR *, db_pgno_t, u_int32_t, int, int)); */ void __ham_c_update(hcp, chg_pgno, len, add, is_dup) HASH_CURSOR *hcp; db_pgno_t chg_pgno; u_int32_t len; int add, is_dup; { DBC *cp; HTAB *hp; HASH_CURSOR *lcp; int page_deleted; /* * Regular adds are always at the end of a given page, so we never * have to adjust anyone's cursor after a regular add. */ if (!is_dup && add) return; /* * Determine if a page was deleted. If this is a regular update * (i.e., not is_dup) then the deleted page's number will be that in * chg_pgno, and the pgno in the cursor will be different. If this * was an onpage-duplicate, then the same conditions apply. If this * was an off-page duplicate, then we need to verify if hcp->dpgno * is the same (no delete) or different (delete) than chg_pgno. */ if (!is_dup || hcp->dpgno == PGNO_INVALID) page_deleted = chg_pgno != PGNO_INVALID && chg_pgno != hcp->pgno; else page_deleted = chg_pgno != PGNO_INVALID && chg_pgno != hcp->dpgno; hp = hcp->db_cursor->dbp->master->internal; DB_THREAD_LOCK(hp->dbp); for (cp = TAILQ_FIRST(&hp->dbp->curs_queue); cp != NULL; cp = TAILQ_NEXT(cp, links)) { if (cp->internal == hcp) continue; lcp = (HASH_CURSOR *)cp->internal; if (!is_dup && lcp->pgno != chg_pgno) continue; if (is_dup) { if (F_ISSET(hcp, H_DELETED) && lcp->pgno != chg_pgno) continue; if (!F_ISSET(hcp, H_DELETED) && lcp->dpgno != chg_pgno) continue; } if (page_deleted) { if (is_dup) { lcp->dpgno = hcp->dpgno; lcp->dndx = hcp->dndx; } else { lcp->pgno = hcp->pgno; lcp->bndx = hcp->bndx; lcp->bucket = hcp->bucket; } F_CLR(lcp, H_ISDUP); continue; } if (!is_dup && lcp->bndx > hcp->bndx) lcp->bndx--; else if (!is_dup && lcp->bndx == hcp->bndx) F_SET(lcp, H_DELETED); else if (is_dup && lcp->bndx == hcp->bndx) { /* Assign dpgno in case there was page conversion. */ lcp->dpgno = hcp->dpgno; if (add && lcp->dndx >= hcp->dndx ) lcp->dndx++; else if (!add && lcp->dndx > hcp->dndx) lcp->dndx--; else if (!add && lcp->dndx == hcp->dndx) F_SET(lcp, H_DELETED); /* Now adjust on-page information. */ if (lcp->dpgno == PGNO_INVALID) if (add) { lcp->dup_tlen += len; if (lcp->dndx > hcp->dndx) lcp->dup_off += len; } else { lcp->dup_tlen -= len; if (lcp->dndx > hcp->dndx) lcp->dup_off -= len; } } } DB_THREAD_UNLOCK(hp->dbp); } /* * __ham_hdup -- * This function gets called when we create a duplicate handle for a * threaded DB. It should create the private part of the DB structure. * * PUBLIC: int __ham_hdup __P((DB *, DB *)); */ int __ham_hdup(orig, new) DB *orig, *new; { DBC *curs; HTAB *hashp; int ret; if ((hashp = (HTAB *)__db_malloc(sizeof(HTAB))) == NULL) return (ENOMEM); new->internal = hashp; hashp->dbp = new; hashp->hlock = 0; hashp->hdr = NULL; hashp->hash = ((HTAB *)orig->internal)->hash; if ((hashp->split_buf = (PAGE *)__db_malloc(orig->pgsize)) == NULL) return (ENOMEM); hashp->local_errno = 0; hashp->hash_accesses = 0; hashp->hash_collisions = 0; hashp->hash_expansions = 0; hashp->hash_overflows = 0; hashp->hash_bigpages = 0; /* Initialize the cursor queue. */ ret = __ham_c_init(new, NULL, &curs); TAILQ_INSERT_TAIL(&new->curs_queue, curs, links); return (ret); }