From 4a7804bd08c790fc3c4233312e4b485c3302fe02 Mon Sep 17 00:00:00 2001 From: jdybnis Date: Mon, 10 Nov 2008 06:01:41 +0000 Subject: [PATCH 1/1] Initial commit --- CuTest-license.txt | 38 ++++ include/CuTest.h | 109 ++++++++++ include/common.h | 47 +++++ include/ht.h | 25 +++ include/lwt.h | 64 ++++++ include/mem.h | 10 + include/murmur.h | 63 ++++++ include/nbd.h | 9 + include/rcu.h | 10 + include/tls.h | 34 +++ license.txt | 32 +++ makefile | 63 ++++++ struct/ht.c | 514 +++++++++++++++++++++++++++++++++++++++++++++ struct/ht_test.c | 181 ++++++++++++++++ struct/list.c | 278 ++++++++++++++++++++++++ tags | 335 +++++++++++++++++++++++++++++ todo | 7 + util/CuTest.c | 309 +++++++++++++++++++++++++++ util/lwt.c | 111 ++++++++++ util/mem.c | 151 +++++++++++++ util/nbd.c | 24 +++ util/rcu.c | 211 +++++++++++++++++++ 22 files changed, 2625 insertions(+) create mode 100644 CuTest-license.txt create mode 100644 include/CuTest.h create mode 100644 include/common.h create mode 100644 include/ht.h create mode 100644 include/lwt.h create mode 100644 include/mem.h create mode 100644 include/murmur.h create mode 100644 include/nbd.h create mode 100644 include/rcu.h create mode 100644 include/tls.h create mode 100644 license.txt create mode 100644 makefile create mode 100644 struct/ht.c create mode 100644 struct/ht_test.c create mode 100644 struct/list.c create mode 100644 tags create mode 100644 todo create mode 100644 util/CuTest.c create mode 100644 util/lwt.c create mode 100644 util/mem.c create mode 100644 util/nbd.c create mode 100644 util/rcu.c diff --git a/CuTest-license.txt b/CuTest-license.txt new file mode 100644 index 0000000..5f053ba --- /dev/null +++ b/CuTest-license.txt @@ -0,0 +1,38 @@ +NOTE + +The license is based on the zlib/libpng license. For more details see +http://www.opensource.org/licenses/zlib-license.html. The intent of the +license is to: + +- keep the license as simple as possible +- encourage the use of CuTest in both free and commercial applications + and libraries +- keep the source code together +- give credit to the CuTest contributors for their work + +If you ship CuTest in source form with your source distribution, the +following license document must be included with it in unaltered form. +If you find CuTest useful we would like to hear about it. + +LICENSE + +Copyright (c) 2003 Asim Jalis + +This software is provided 'as-is', without any express or implied +warranty. In no event will the authors be held liable for any damages +arising from the use of this software. + +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it +freely, subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not +claim that you wrote the original software. If you use this software in +a product, an acknowledgment in the product documentation would be +appreciated but is not required. + +2. Altered source versions must be plainly marked as such, and must not +be misrepresented as being the original software. + +3. This notice may not be removed or altered from any source +distribution. diff --git a/include/CuTest.h b/include/CuTest.h new file mode 100644 index 0000000..b315151 --- /dev/null +++ b/include/CuTest.h @@ -0,0 +1,109 @@ +#ifndef CU_TEST_H +#define CU_TEST_H + +#include +#include + +/* CuString */ + +char* CuStrAlloc(int size); +char* CuStrCopy(const char* old); + +#define CU_ALLOC(TYPE) ((TYPE*) malloc(sizeof(TYPE))) + +#define HUGE_STRING_LEN 8192 +#define STRING_MAX 256 +#define STRING_INC 256 + +typedef struct +{ + int length; + int size; + char* buffer; +} CuString; + +void CuStringInit(CuString* str); +CuString* CuStringNew(void); +void CuStringRead(CuString* str, const char* path); +void CuStringAppend(CuString* str, const char* text); +void CuStringAppendChar(CuString* str, char ch); +void CuStringAppendFormat(CuString* str, const char* format, ...); +void CuStringInsert(CuString* str, const char* text, int pos); +void CuStringResize(CuString* str, int newSize); + +/* CuTest */ + +typedef struct CuTest CuTest; + +typedef void (*TestFunction)(CuTest *); + +struct CuTest +{ + const char* name; + TestFunction function; + int failed; + int ran; + const char* message; + jmp_buf *jumpBuf; +}; + +void CuTestInit(CuTest* t, const char* name, TestFunction function); +CuTest* CuTestNew(const char* name, TestFunction function); +void CuTestRun(CuTest* tc); + +/* Internal versions of assert functions -- use the public versions */ +void CuFail_Line(CuTest* tc, + const char* file, int line, const char* message2, const char* message); +void CuAssert_Line(CuTest* tc, + const char* file, int line, const char* message, int condition); +void CuAssertStrEquals_LineMsg(CuTest* tc, + const char* file, int line, const char* message, const char* expected, const char* actual); +void CuAssertIntEquals_LineMsg(CuTest* tc, + const char* file, int line, const char* message, int expected, int actual); +void CuAssertDblEquals_LineMsg(CuTest* tc, + const char* file, int line, const char* message, double expected, double actual, double delta); +void CuAssertPtrEquals_LineMsg(CuTest* tc, + const char* file, int line, const char* message, void* expected, void* actual); + +/* public assert functions */ + +#define CuFail(tc, ms) CuFail_Line( (tc), __FILE__, __LINE__, NULL, (ms)) +#define CuAssert(tc, ms, cond) CuAssert_Line((tc), __FILE__, __LINE__, (ms), (cond)) +#define CuAssertTrue(tc, cond) CuAssert_Line((tc), __FILE__, __LINE__, "assert failed", (cond)) + +#define CuAssertStrEquals(tc,ex,ac) CuAssertStrEquals_LineMsg((tc),__FILE__,__LINE__,NULL,(ex),(ac)) +#define CuAssertStrEquals_Msg(tc,ms,ex,ac) CuAssertStrEquals_LineMsg((tc),__FILE__,__LINE__,(ms),(ex),(ac)) +#define CuAssertIntEquals(tc,ex,ac) CuAssertIntEquals_LineMsg((tc),__FILE__,__LINE__,NULL,(ex),(ac)) +#define CuAssertIntEquals_Msg(tc,ms,ex,ac) CuAssertIntEquals_LineMsg((tc),__FILE__,__LINE__,(ms),(ex),(ac)) +#define CuAssertDblEquals(tc,ex,ac,dl) CuAssertDblEquals_LineMsg((tc),__FILE__,__LINE__,NULL,(ex),(ac),(dl)) +#define CuAssertDblEquals_Msg(tc,ms,ex,ac,dl) CuAssertDblEquals_LineMsg((tc),__FILE__,__LINE__,(ms),(ex),(ac),(dl)) +#define CuAssertPtrEquals(tc,ex,ac) CuAssertPtrEquals_LineMsg((tc),__FILE__,__LINE__,NULL,(ex),(ac)) +#define CuAssertPtrEquals_Msg(tc,ms,ex,ac) CuAssertPtrEquals_LineMsg((tc),__FILE__,__LINE__,(ms),(ex),(ac)) + +#define CuAssertPtrNotNull(tc,p) CuAssert_Line((tc),__FILE__,__LINE__,"null pointer unexpected",(p != NULL)) +#define CuAssertPtrNotNullMsg(tc,msg,p) CuAssert_Line((tc),__FILE__,__LINE__,(msg),(p != NULL)) + +/* CuSuite */ + +#define MAX_TEST_CASES 1024 + +#define SUITE_ADD_TEST(SUITE,TEST) CuSuiteAdd(SUITE, CuTestNew(#TEST, TEST)) + +typedef struct +{ + int count; + CuTest* list[MAX_TEST_CASES]; + int failCount; + +} CuSuite; + + +void CuSuiteInit(CuSuite* testSuite); +CuSuite* CuSuiteNew(void); +void CuSuiteAdd(CuSuite* testSuite, CuTest *testCase); +void CuSuiteAddSuite(CuSuite* testSuite, CuSuite* testSuite2); +void CuSuiteRun(CuSuite* testSuite); +void CuSuiteSummary(CuSuite* testSuite, CuString* summary); +void CuSuiteDetails(CuSuite* testSuite, CuString* details); + +#endif /* CU_TEST_H */ diff --git a/include/common.h b/include/common.h new file mode 100644 index 0000000..aea1f67 --- /dev/null +++ b/include/common.h @@ -0,0 +1,47 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#ifndef COMMON_H +#define COMMON_H + +#include +#include +#include +#include +#include + +#define MAX_NUM_THREADS 4 // make this whatever you want, but make it a power of 2 + +#define CACHE_LINE_SIZE 64 + +#define CAT(x,y) x##y +#define ON_EXIT_SCOPE_I(x,i) \ + inline void CAT(scope_cleanup_function_, i) (int *CAT(scope_cleanup_dummy_argument_, i)) { x; }; \ + int CAT(scope_cleanup_dummy_variable_, i) __attribute__((cleanup(CAT(scope_cleanup_function_, i)))); +#define ON_EXIT_SCOPE(x) ON_EXIT_SCOPE_I(x,__LINE__) + +#define EXPECT_TRUE(x) __builtin_expect(x, 1) +#define EXPECT_FALSE(x) __builtin_expect(x, 0) + +#define SYNC_SWAP __sync_lock_test_and_set +#define SYNC_CAS __sync_val_compare_and_swap +#define SYNC_ADD __sync_add_and_fetch +#define SYNC_FETCH_AND_OR __sync_fetch_and_or + +#define MASK(n) ((1LL << (n)) - 1) + +#define TAG (1LL << 63) +#define IS_TAGGED(v) ((int64_t)(v) < 0) +#define TAG_VALUE(v) ((int64_t)(v) | TAG) +#define STRIP_TAG(v) ((int64_t)(v) & ~TAG) + +#define TRUE 1 +#define FALSE 0 + +typedef long long int64_t; +typedef unsigned long long uint64_t; +typedef unsigned int uint32_t; + +#include "lwt.h" +#endif //COMMON_H diff --git a/include/ht.h b/include/ht.h new file mode 100644 index 0000000..b396ce8 --- /dev/null +++ b/include/ht.h @@ -0,0 +1,25 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#ifndef HT_H +#define HT_H + +#include "common.h" + +#define DOES_NOT_EXIST 0 + +#define HT_EXPECT_NOT_EXISTS ( 0) +#define HT_EXPECT_EXISTS (-1) +#define HT_EXPECT_WHATEVER (-2) + +typedef struct hash_table_i *hash_table_t; + +hash_table_t *ht_alloc (void); +void ht_free (hash_table_t *ht); +int64_t ht_get (hash_table_t *ht, const char *key, uint32_t len); +int64_t ht_compare_and_set (hash_table_t *ht, const char *key, uint32_t key_len, int64_t expected_val, int64_t val); +int64_t ht_remove (hash_table_t *ht, const char *key, uint32_t len); +uint64_t ht_count (hash_table_t *ht); + +#endif//HT_H diff --git a/include/lwt.h b/include/lwt.h new file mode 100644 index 0000000..789c667 --- /dev/null +++ b/include/lwt.h @@ -0,0 +1,64 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * lightweight tracing + */ +#ifndef LWT_H +#define LWT_H +#include "tls.h" + +#ifndef ENABLE_TRACE +#define TRACE(...) do { } while (0) +#else +#define TRACE(flag, format, v1, v2) lwt_trace(flag, format, (size_t)(v1), (size_t)(v2)) +#endif + +#define LWT_BUFFER_SCALE 16 +#define LWT_BUFFER_SIZE (1 << LWT_BUFFER_SCALE) +#define LWT_BUFFER_MASK (LWT_BUFFER_SIZE - 1) + +typedef struct lwt_record { + uint64_t timestamp; + const char *format; + size_t value1; + size_t value2; +} lwt_record_t; + +typedef struct lwt_buffer { + uint32_t head; + lwt_record_t x[0]; +} lwt_buffer_t; + +void lwt_init (void); +void lwt_thread_init (int thread_id); + +void lwt_dump (const char *file_name) __attribute__ ((externally_visible)); + +// indicates what kind of trace messages should be included in the dump. is a sequence of letters +// followed by numbers (e.g. "x1c9n2g3"). The letters indicate trace categories and the numbers are trace levels +// for each category. If a category appears in , then messages from that category will be included in the +// dump if they have a trace level less than or equal to the one specified in . Categories are case +// sensitive. +void lwt_set_trace_level (const char *flags); + +// is a two character string containing a letter followed by a number (e.g. "f3"). The letter indicates a +// trace category, and the number a trace level. controls whether or not the trace message gets included in +// the dump. It is only included when its specified category is enabled at a trace level greater than or equal to +// the one in . Categories are case sensitive. +static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) { + extern uint64_t flag_mask_; + if (EXPECT_FALSE(flag_mask_ & (1 << (flag[0] - 'A')))) { + LOCALIZE_THREAD_LOCAL(tb_, lwt_buffer_t *); + unsigned int u, l; + __asm__ __volatile__("rdtsc" : "=a" (l), "=d" (u)); + uint64_t timestamp = ((uint64_t)u << 32) | l; + // embed in so we don't have to make the lwt_record_t any bigger than it already is + format = (const char *)((size_t)format | ((uint64_t)flag[0] << 56) | ((uint64_t)flag[1] << 48)); + lwt_record_t temp = { timestamp, format, value1, value2 }; + if (tb_) { + tb_->x[tb_->head++ & LWT_BUFFER_MASK] = temp; + } + } +} +#endif//LWT_H diff --git a/include/mem.h b/include/mem.h new file mode 100644 index 0000000..b04602c --- /dev/null +++ b/include/mem.h @@ -0,0 +1,10 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#ifndef MEM_H +#define MEM_H +void mem_init (void); +void nbd_free (void *x); +void *nbd_malloc (size_t n); +#endif//MEM_H diff --git a/include/murmur.h b/include/murmur.h new file mode 100644 index 0000000..fcb4cb6 --- /dev/null +++ b/include/murmur.h @@ -0,0 +1,63 @@ +//----------------------------------------------------------------------------- +// MurmurHash2, by Austin Appleby + +// Note - This code makes a few assumptions about how your machine behaves - + +// 1. We can read a 4-byte value from any address without crashing +// 2. sizeof(int) == 4 + +// And it has a few limitations - + +// 1. It will not work incrementally. +// 2. It will not produce the same results on little-endian and big-endian +// machines. + +static inline unsigned int murmur32 (const char *key, int len) +{ + // 'm' and 'r' are mixing constants generated offline. + // They're not really 'magic', they just happen to work well. + + const unsigned int m = 0x5bd1e995; + const int r = 24; + + // Initialize the hash to a 'random' value + unsigned int h = len; + + // Mix 4 bytes at a time into the hash + + const unsigned char *data = (const unsigned char *)key; + + while(len >= 4) + { + uint32_t k = *(uint32_t *)data; + + k *= m; + k ^= k >> r; + k *= m; + + h *= m; + h ^= k; + + data += 4; + len -= 4; + } + + // Handle the last few bytes of the input array + + switch(len) + { + case 3: h ^= data[2] << 16; + case 2: h ^= data[1] << 8; + case 1: h ^= data[0]; + h *= m; + }; + + // Do a few final mixes of the hash to ensure the last few + // bytes are well-incorporated. + + h ^= h >> 13; + h *= m; + h ^= h >> 15; + + return h; +} diff --git a/include/nbd.h b/include/nbd.h new file mode 100644 index 0000000..a6e1dcc --- /dev/null +++ b/include/nbd.h @@ -0,0 +1,9 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#ifndef NBD_H +#define NBD_H +void nbd_init (void); +void nbd_thread_init (int id); +#endif//NBD_H diff --git a/include/rcu.h b/include/rcu.h new file mode 100644 index 0000000..dd3f121 --- /dev/null +++ b/include/rcu.h @@ -0,0 +1,10 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#ifndef RCU_H +#define RCU_H +void rcu_thread_init (int thread_id); +void rcu_update (void); +void nbd_defer_free (void *x); +#endif//RCU_H diff --git a/include/tls.h b/include/tls.h new file mode 100644 index 0000000..def2bec --- /dev/null +++ b/include/tls.h @@ -0,0 +1,34 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * A platform independant wrapper around thread-local storage. On platforms that don't support + * __thread variables (e.g. Mac OS X), we have to use the pthreads library for thread-local storage + */ +#ifndef TLS_H +#define TLS_H + +#ifdef __ELF__ // use gcc thread-local storage (i.e. __thread variables) +#define DECLARE_THREAD_LOCAL (name, type) type name +#define INIT_THREAD_LOCAL (name, value) name = value +#define SET_THREAD_LOCAL (name, value) name = value +#define LOCALIZE_THREAD_LOCAL(name, type) extern __thread type name + +#else//!__ELF__ + +#include + +#define DECLARE_THREAD_LOCAL(name, type) pthread_key_t name##_KEY + +#define INIT_THREAD_LOCAL(name, value) \ + do { \ + if (pthread_key_create(&name##_KEY, (void *)(size_t)value) != 0) { assert(FALSE); } \ + } while (0) + +#define SET_THREAD_LOCAL(name, value) pthread_setspecific(name##_KEY, (void *)(size_t)value); + +#define LOCALIZE_THREAD_LOCAL(name, type) \ + extern pthread_key_t name##_KEY; type name = (type)(size_t)pthread_getspecific(name##_KEY) + +#endif//__ELF__ +#endif//TLS_H diff --git a/license.txt b/license.txt new file mode 100644 index 0000000..ac93118 --- /dev/null +++ b/license.txt @@ -0,0 +1,32 @@ +Code in this distribution that is written by Josh Dybnis is released to the +public domain, as explained at http://creativecommons.org/licenses/publicdomain +which is repeated below: + + The person or persons who have associated work with this document (the + "Dedicator" or "Certifier") hereby either (a) certifies that, to the + best of his knowledge, the work of authorship identified is in the + public domain of the country from which the work is published, or (b) + hereby dedicates whatever copyright the dedicators holds in the work of + authorship identified below (the "Work") to the public domain. A + certifier, moreover, dedicates any copyright interest he may have in the + associated work, and for these purposes, is described as a "dedicator" + below. + + A certifier has taken reasonable steps to verify the copyright status of + this work. Certifier recognizes that his good faith efforts may not + shield him from liability if in fact the work certified is not in the + public domain. + + Dedicator makes this dedication for the benefit of the public at large + and to the detriment of the Dedicator's heirs and successors. Dedicator + intends this dedication to be an overt act of relinquishment in + perpetuity of all present and future rights under copyright law, + whether vested or contingent, in the Work. Dedicator understands that + such relinquishment of all rights includes the relinquishment of all + rights to enforce (by lawsuit or otherwise) those copyrights in the Work. + + Dedicator recognizes that, once placed in the public domain, the Work may + be freely reproduced, distributed, transmitted, used, modified, built + upon, or otherwise exploited by anyone for any purpose, commercial or + non-commercial, and in any way, including by methods that have not yet + been invented or conceived. diff --git a/makefile b/makefile new file mode 100644 index 0000000..a59466f --- /dev/null +++ b/makefile @@ -0,0 +1,63 @@ +################################################################################################### +# Written by Josh Dybnis and released to the public domain, as explained at +# http://creativecommons.org/licenses/publicdomain +# +################################################################################################### +# Makefile for building programs with whole-program interfile optimization +################################################################################################### +OPT := -fwhole-program -combine -03 #-DNDEBUG +CFLAGS := -g -Wall -Werror -std=c99 -m64 -fnested-functions $(OPT) #-DENABLE_TRACE +INCS := $(addprefix -I, include) +TESTS := output/rcu_test output/list_test output/ht_test +EXES := $(TESTS) + +UTIL_SRCS := util/nbd.c util/rcu.c util/lwt.c util/CuTest.c util/mem.c +rcu_test_SRCS := $(UTIL_SRCS) +list_test_SRCS := $(UTIL_SRCS) struct/list.c +ht_test_SRCS := $(UTIL_SRCS) struct/ht.c struct/ht_test.c + +tests: $(TESTS) + +################################################################################################### +# Run the tests +################################################################################################### +test: $(addsuffix .log, $(TESTS)) + @echo > /dev/null + +$(addsuffix .log, $(TESTS)) : %.log : % + @echo "Running $*" && $* | tee $*.log + +################################################################################################### +# Rebuild an executable if any of it's source files need to be recompiled +# +# Note: Calculating dependancies as a side-effect of compilation is disabled. There is a bug in +# gcc. Compilation fails when -MM -MF is used and there is more than one source file. +# -MM -MT $@.d -MF $@.d +################################################################################################### +$(EXES): output/% : output/%.d makefile + gcc $(CFLAGS) $(INCS) -DMAKE_$* -o $@ $($*_SRCS) + +################################################################################################### +# Build tags file for vi +################################################################################################### +tags: + ctags -R . + +################################################################################################### +# +################################################################################################### +clean: + rm -rfv output/* + +.PHONY: clean test tags + +################################################################################################### +# Generate the dependencies lists for each executable +# +# Note: -combine is removed from CFLAGS because of a bug in gcc. The compiler chokes when +# -MM is used with -combine. +################################################################################################### +$(addsuffix .d, $(EXES)) : output/%.d : + gcc $(CFLAGS:-combine:) $(INCS) -DMAKE_$* -MM -MT $@ $($*_SRCS) > $@ + +-include $(addsuffix .d, $(EXES)) diff --git a/struct/ht.c b/struct/ht.c new file mode 100644 index 0000000..8d5e160 --- /dev/null +++ b/struct/ht.c @@ -0,0 +1,514 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * C implementation of Cliff Click's lock-free hash table from + * http://www.azulsystems.com/events/javaone_2008/2008_CodingNonBlock.pdf + * http://sourceforge.net/projects/high-scale-lib + * + * Note: This is code uses synchronous atomic operations because that is all that x86 provides. + * Every atomic operation is also an implicit full memory barrier. The upshot is that it simplifies + * the code a bit, but it won't be as fast as it could be on platforms like SPARC that provide + * weaker operations which would still do the job. + */ + +#include "common.h" +#include "ht.h" +#include "murmur.h" +#include "mem.h" +#include "rcu.h" + +#define COPIED_VALUE (-1) +#define TOMBSTONE STRIP_TAG(COPIED_VALUE) + +#define ENTRIES_PER_BUCKET (CACHE_LINE_SIZE/sizeof(entry_t)) +#define ENTRIES_PER_COPY_CHUNK (ENTRIES_PER_BUCKET * 2) +#define MIN_SCALE (__builtin_ctz(ENTRIES_PER_BUCKET) + 2) // min 4 buckets +#define MAX_BUCKETS_TO_PROBE 250 + +typedef struct ht_entry { + int64_t key; + int64_t value; +} entry_t; + +typedef struct string { + uint32_t len; + char val[]; +} string_t; + +typedef struct hash_table_i { + volatile entry_t *table; + hash_table_t *ht; // parent ht; + struct hash_table_i *next; + struct hash_table_i *next_free; + unsigned int scale; + int max_probe; + int count; // TODO: make these counters distributed + int num_entries_copied; + int scan; +} hash_table_i_t; + +static int hti_copy_entry + (hash_table_i_t *old_hti, volatile entry_t *e, uint32_t e_key_hash, hash_table_i_t *new_hti); + +// Choose the next bucket to probe using the high-order bits of . +static inline int get_next_ndx(int old_ndx, uint32_t key_hash, int ht_scale) { + int incr = (key_hash >> (32 - ht_scale)); + incr += !incr; // If the increment is 0, make it 1. + return (old_ndx + incr) & MASK(ht_scale); +} + +// Compare two keys. +// +// A key is made up of two parts. The 48 low-order bits are a pointer to a null terminated string. +// The high-order 16 bits are taken from the hash of that string. The bits from the hash are used +// as a quick check to rule out non-equal keys without doing a complete string compare. +static inline int ht_key_equals (uint64_t a, uint32_t b_hash, const char *b_value, uint32_t b_len) { + if ((b_hash >> 16) != (a >> 48)) // high-order 16 bits are from the hash value + return FALSE; + const string_t *a_key = (string_t *)(a & MASK(48)); // low-order 48 bits is a pointer + assert(a_key); + return a_key->len == b_len && memcmp(a_key->val, b_value, b_len) == 0; +} + +// Lookup in . +// +// Return the entry that is in, or if isn't in return the entry that it would be +// in if it were inserted into . If there is no room for in then return NULL, to +// indicate that the caller should look in next>. +// +// Record if the entry being returned is empty. Otherwise the caller will have to waste time with +// ht_key_equals() to confirm that it did not lose a race to fill an empty entry. +static volatile entry_t *hti_lookup (hash_table_i_t *hti, uint32_t key_hash, const char *key_val, uint32_t key_len, int *is_empty) { + TRACE("h0", "hti_lookup(key \"%s\" in hti %p)", key_val, hti); + *is_empty = 0; + + // Probe one cache line at a time + int ndx = key_hash & MASK(hti->scale); // the first entry to search + int i; + for (i = 0; i < hti->max_probe; ++i) { + + // The start of the bucket is the first entry in the cache line. + volatile entry_t *bucket = hti->table + (ndx & ~(ENTRIES_PER_BUCKET-1)); + + // Start searching at the indexed entry. Then loop around to the begining of the cache line. + int j; + for (j = 0; j < ENTRIES_PER_BUCKET; ++j) { + volatile entry_t *e = bucket + ((ndx + j) & (ENTRIES_PER_BUCKET-1)); + + uint64_t e_key = e->key; + if (e_key == DOES_NOT_EXIST) { + TRACE("h0", "hti_lookup: empty entry %p found on probe %d", e, i*ENTRIES_PER_BUCKET+j+1); + // we tag the pointer so the caller can avoid an expensive ht_key_equals() + *is_empty = 1; + return e; + } + + if (ht_key_equals(e_key, key_hash, key_val, key_len)) { + TRACE("h0", "hti_lookup: entry %p found on probe %d", e, i*ENTRIES_PER_BUCKET+j+1); + TRACE("h0", "hti_lookup: with key \"%s\" value %p", + ((string_t *)(e_key & MASK(48)))->val, e->value); + return e; + } + } + + ndx = get_next_ndx(ndx, key_hash, hti->scale); + } + + // maximum number of probes exceeded + TRACE("h0", "hti_lookup: maximum number of probes exceeded returning 0x0", 0, 0); + return NULL; +} + +// Allocate and initialize a hash_table_i_t with 2^ entries. +static hash_table_i_t *hti_alloc (hash_table_t *parent, int scale) { + // Include enough slop to align the actual table on a cache line boundry + size_t n = sizeof(hash_table_i_t) + + sizeof(entry_t) * (1 << scale) + + (CACHE_LINE_SIZE - 1); + hash_table_i_t *hti = (hash_table_i_t *)calloc(n, 1); + + // Align the table of hash entries on a cache line boundry. + hti->table = (entry_t *)(((uint64_t)hti + sizeof(hash_table_i_t) + (CACHE_LINE_SIZE-1)) + & ~(CACHE_LINE_SIZE-1)); + + hti->scale = scale; + + // When searching for a key probe a maximum of 1/4 of the buckets up to 1000 buckets. + hti->max_probe = ((1 << (hti->scale - 2)) / ENTRIES_PER_BUCKET) + 2; + if (hti->max_probe > MAX_BUCKETS_TO_PROBE) { + hti->max_probe = MAX_BUCKETS_TO_PROBE; + } + + hti->ht = parent; + + assert(hti->scale >= MIN_SCALE && hti->scale < 63); // size must be a power of 2 + assert(sizeof(entry_t) * ENTRIES_PER_BUCKET % CACHE_LINE_SIZE == 0); // divisible into cache + assert((size_t)hti->table % CACHE_LINE_SIZE == 0); // cache aligned + + return hti; +} + +// Called when runs out of room for new keys. +// +// Initiates a copy by creating a larger hash_table_i_t and installing it in next>. +static void hti_start_copy (hash_table_i_t *hti) { + TRACE("h0", "hti_start_copy(hti %p hti->next %p)", hti, hti->next); + if (hti->next) + return; // another thread beat us to it + + // heuristics to determine the size of the new table + uint64_t count = ht_count(hti->ht); + unsigned int new_scale = hti->scale; + new_scale += (count > (1 << (new_scale - 2))); // double size if more than 1/4 full + new_scale += (count > (1 << (new_scale - 2))); // double size again if more than 1/2 full + + // Allocate the new table and attempt to install it. + hash_table_i_t *next = hti_alloc(hti->ht, hti->scale + 1); + hash_table_i_t *old_next = SYNC_CAS(&hti->next, NULL, next); + if (old_next != NULL) { + TRACE("h0", "hti_start_copy: lost race to install new hti; found %p", old_next, 0); + // Another thread beat us to it. + nbd_free(next); + return; + } + TRACE("h0", "hti_start_copy: new hti is %p", next, 0); +} + +// Copy the key and value stored in (which must be an entry in ) to . +// +// Return 1 unless is already copied (then return 0), so the caller can account for the total +// number of entries left to copy. +static int hti_copy_entry (hash_table_i_t *old_hti, volatile entry_t *old_e, uint32_t key_hash, + hash_table_i_t *new_hti) { + TRACE("h0", "hti_copy_entry(copy entry from %p to %p)", old_hti, new_hti); + assert(old_hti); + assert(old_hti->next); + assert(new_hti); + assert(old_e >= old_hti->table && old_e < old_hti->table + (1 << old_hti->scale)); + + int64_t old_e_value = old_e->value; + TRACE("h0", "hti_copy_entry: entry %p current value %p", old_e, old_e_value); + if (EXPECT_FALSE(old_e_value == COPIED_VALUE)) + return FALSE; // already copied + + // Kill empty entries. + if (EXPECT_FALSE(old_e_value == DOES_NOT_EXIST)) { + old_e_value = SYNC_CAS(&old_e->value, DOES_NOT_EXIST, COPIED_VALUE); + if (old_e_value == DOES_NOT_EXIST) { + TRACE("h0", "hti_copy_entry: old entry killed", 0, 0); + return TRUE; + } + if (old_e_value == COPIED_VALUE) { + TRACE("h0", "hti_copy_entry: lost race to kill empty entry in old hti", 0, 0); + return FALSE; // another thread beat us to it + } + TRACE("h0", "hti_copy_entry: lost race to kill empty entry in old hti; " + "the entry is now being used", 0, 0); + } + + // Tag the value in the old entry to indicate a copy is in progress. + old_e_value = SYNC_FETCH_AND_OR(&old_e->value, TAG_VALUE(0)); + TRACE("h0", "hti_copy_entry: tagged the value %p in old entry %p", old_e_value, old_e); + if (old_e_value == COPIED_VALUE) + return FALSE; // was already copied by another thread. + + // Deleted entries don't need to be installed into to the new table, but their keys do need to + // be freed. + assert(COPIED_VALUE == TAG_VALUE(TOMBSTONE)); + if (old_e_value == TOMBSTONE) { + nbd_free((string_t *)(old_e->key & MASK(48))); + return TRUE; + } + old_e_value = STRIP_TAG(old_e_value); + + // Install the key in the new table. + uint64_t old_e_key = old_e->key; + string_t *key = (string_t *)(old_e_key & MASK(48)); + TRACE("h0", "hti_copy_entry: key %p is %s", old_e_key, key->val); + + // We use 0 to indicate that isn't initiallized. Occasionally the will + // really be 0 and we will waste time recomputing it. That is rare enough that it is OK. + if (key_hash == 0) { + key_hash = murmur32(key->val, key->len); + } + + int is_empty; + volatile entry_t *new_e = hti_lookup(new_hti, key_hash, key->val, key->len, &is_empty); + + // it is possible that there is not any room in the new table either + if (EXPECT_FALSE(new_e == NULL)) { + hti_start_copy(new_hti); // initiate nested copy, if not already started + return hti_copy_entry(old_hti, old_e, key_hash, new_hti->next); // recursive tail-call + } + + // a tagged entry returned from hti_lookup() means it is either empty or has a new key + if (is_empty) { + uint64_t new_e_key = SYNC_CAS(&new_e->key, DOES_NOT_EXIST, old_e_key); + if (new_e_key != DOES_NOT_EXIST) { + TRACE("h0", "hti_copy_entry: lost race to CAS key %p into new entry; found %p", + old_e_key, new_e_key); + return hti_copy_entry(old_hti, old_e, key_hash, new_hti); // recursive tail-call + } + } + assert(ht_key_equals(new_e->key, key_hash, key->val, key->len)); + TRACE("h0", "hti_copy_entry: key %p installed in new old hti %p", key->val, new_hti); + + // Copy the value to the entry in the new table. + uint64_t new_e_value = SYNC_CAS(&new_e->value, DOES_NOT_EXIST, old_e_value); + + // If there is a nested copy in progress, we might have installed the key into a dead entry. + if (new_e_value == COPIED_VALUE) + return hti_copy_entry(old_hti, old_e, key_hash, new_hti->next); // recursive tail-call + + // Mark the old entry as dead. + old_e->value = COPIED_VALUE; + + // Update the count if we were the one that completed the copy. + if (new_e_value == DOES_NOT_EXIST) { + TRACE("h0", "hti_copy_entry: value %p installed in new hti %p", old_e_value, new_hti); + SYNC_ADD(&old_hti->count, -1); + SYNC_ADD(&new_hti->count, 1); + return TRUE; + } + + TRACE("h0", "hti_copy_entry: lost race to CAS value %p in new hti; found %p", + old_e_value, new_e_value); + return FALSE; // another thread completed the copy +} + +// Compare with the existing value associated with . If the values match then +// replace the existing value with . If is TOMBSTONE, delete the value associated with +// the key by replacing it with a TOMBSTONE. +// +// Return the previous value associated with , or DOES_NOT_EXIST if is not in the table +// or associated with a TOMBSTONE. If a copy is in progress and has been copied to the next +// table then return COPIED_VALUE. +// +// NOTE: the returned value matches iff the set succeeds +// +// Certain values of have special meaning. If is HT_EXPECT_EXISTS then any +// real value matches (i.e. not a TOMBSTONE or DOES_NOT_EXIST) as long as is in the table. If +// is HT_EXPECT_WHATEVER then skip the test entirely. +// +static int64_t hti_compare_and_set (hash_table_i_t *hti, uint32_t key_hash, const char *key_val, + uint32_t key_len, int64_t expected, int64_t new) { + TRACE("h0", "hti_compare_and_set(hti %p key \"%s\")", hti, key_val); + TRACE("h0", "hti_compare_and_set(new value %p; caller expects value %p)", new, expected); + assert(hti); + assert(new != DOES_NOT_EXIST && !IS_TAGGED(new)); + assert(key_val); + + int is_empty; + volatile entry_t *e = hti_lookup(hti, key_hash, key_val, key_len, &is_empty); + + // There is no room for , grow the table and try again. + if (e == NULL) { + hti_start_copy(hti); + return COPIED_VALUE; + } + + // Install in the table if it doesn't exist. + if (is_empty) { + TRACE("h0", "hti_compare_and_set: entry %p is empty", e, 0); + if (expected != HT_EXPECT_WHATEVER && expected != HT_EXPECT_NOT_EXISTS) + return DOES_NOT_EXIST; + + // No need to do anything, is already deleted. + if (new == TOMBSTONE) + return DOES_NOT_EXIST; + + // allocate . + string_t *key = nbd_malloc(sizeof(uint32_t) + key_len); + key->len = key_len; + memcpy(key->val, key_val, key_len); + + // CAS into the table. + uint64_t e_key = SYNC_CAS(&e->key, DOES_NOT_EXIST, ((uint64_t)(key_hash >> 16) << 48) | (uint64_t)key); + + // Retry if another thread stole the entry out from under us. + if (e_key != DOES_NOT_EXIST) { + TRACE("h0", "hti_compare_and_set: key in entry %p is \"%s\"", e, e_key & MASK(48)); + TRACE("h0", "hti_compare_and_set: lost race to install key \"%s\" in %p", key->val, e); + nbd_free(key); + return hti_compare_and_set(hti, key_hash, key_val, key_len, expected, new); // tail-call + } + TRACE("h0", "hti_compare_and_set: installed key \"%s\" in entry %p", key_val, e); + } + + // If the entry is in the middle of a copy, the copy must be completed first. + int64_t e_value = e->value; + TRACE("h0", "hti_compare_and_set: value in entry %p is %p", e, e_value); + if (EXPECT_FALSE(IS_TAGGED(e_value))) { + int did_copy = hti_copy_entry(hti, e, key_hash, ((volatile hash_table_i_t *)hti)->next); + if (did_copy) { + SYNC_ADD(&hti->num_entries_copied, 1); + } + return COPIED_VALUE; + } + + // Fail if the old value is not consistent with the caller's expectation. + int old_existed = (e_value != TOMBSTONE && e_value != DOES_NOT_EXIST); + if (EXPECT_FALSE(expected != HT_EXPECT_WHATEVER && expected != e_value)) { + if (EXPECT_FALSE(expected != (old_existed ? HT_EXPECT_EXISTS : HT_EXPECT_NOT_EXISTS))) { + TRACE("h0", "hti_compare_and_set: value expected by caller for key \"%s\" not found; " + "found value %p", key_val, e_value); + return e_value; + } + } + + // CAS the value into the entry. Retry if it fails. + uint64_t v = SYNC_CAS(&e->value, e_value, new); + if (EXPECT_FALSE(v != e_value)) { + TRACE("h0", "hti_compare_and_set: value CAS failed; expected %p found %p", e_value, v); + return hti_compare_and_set(hti, key_hash, key_val, key_len, expected, new); // recursive tail-call + } + + // The set succeeded. Adjust the value count. + if (old_existed && new == TOMBSTONE) { + SYNC_ADD(&hti->count, -1); + } else if (!old_existed && new != TOMBSTONE) { + SYNC_ADD(&hti->count, 1); + } + + // Return the previous value. + TRACE("h0", "hti_compare_and_set: CAS succeeded; old value %p new value %p", e_value, new); + return e_value; +} + +// +static int64_t hti_get (hash_table_i_t *hti, uint32_t key_hash, const char *key_val, uint32_t key_len) { + assert(key_val); + + int is_empty; + volatile entry_t *e = hti_lookup(hti, key_hash, key_val, key_len, &is_empty); + + // When hti_lookup() returns NULL it means we hit the reprobe limit while + // searching the table. In that case, if a copy is in progress the key + // might exist in the copy. + if (EXPECT_FALSE(e == NULL)) { + if (((volatile hash_table_i_t *)hti)->next != NULL) + return hti_get(hti->next, key_hash, key_val, key_len); // recursive tail-call + return DOES_NOT_EXIST; + } + + if (is_empty) + return DOES_NOT_EXIST; + + // If the entry is being copied, finish the copy and retry on the next table. + int64_t e_value = e->value; + if (EXPECT_FALSE(IS_TAGGED(e_value))) { + if (EXPECT_FALSE(e_value != COPIED_VALUE)) { + int did_copy = hti_copy_entry(hti, e, key_hash, ((volatile hash_table_i_t *)hti)->next); + if (did_copy) { + SYNC_ADD(&hti->num_entries_copied, 1); + } + } + return hti_get(((volatile hash_table_i_t *)hti)->next, key_hash, key_val, key_len); // tail-call + } + + return (e_value == TOMBSTONE) ? DOES_NOT_EXIST : e_value; +} + +// +int64_t ht_get (hash_table_t *ht, const char *key_val, uint32_t key_len) { + return hti_get(*ht, murmur32(key_val, key_len), key_val, key_len); +} + +// +int64_t ht_compare_and_set (hash_table_t *ht, const char *key_val, uint32_t key_len, + int64_t expected_val, int64_t new_val) { + + assert(key_val); + assert(!IS_TAGGED(new_val) && new_val != DOES_NOT_EXIST); + + hash_table_i_t *hti = *ht; + + // Help with an ongoing copy. + if (EXPECT_FALSE(hti->next != NULL)) { + + // Reserve some entries for this thread to copy. There is a race condition here because the + // fetch and add isn't atomic, but that is ok. + int x = hti->scan; + hti->scan = x + ENTRIES_PER_COPY_CHUNK; + + // scan> might be larger than the size of the table, if some thread stalls while + // copying. In that case we just wrap around to the begining and make another pass through + // the table. + volatile entry_t *e = hti->table + (x & MASK(hti->scale)); + + // Copy the entries + int num_copied = 0, i; + for (i = 0; i < ENTRIES_PER_COPY_CHUNK; ++i) { + num_copied += hti_copy_entry(hti, e++, 0, hti->next); + assert(e <= hti->table + (1 << hti->scale)); + } + if (num_copied != 0) { + SYNC_ADD(&hti->num_entries_copied, num_copied); + } + } + + // Dispose of fully copied tables. + while (hti->num_entries_copied == (1 << hti->scale)) { + assert(hti->next); + if (SYNC_CAS(ht, hti, hti->next) == hti) { + nbd_defer_free(hti); + } + hti = *ht; + } + + int64_t old_val; + uint32_t key_hash = murmur32(key_val, key_len); + while ((old_val = hti_compare_and_set(hti, key_hash, key_val, key_len, expected_val, new_val)) + == COPIED_VALUE) { + assert(hti->next); + hti = hti->next; + } + + return old_val == TOMBSTONE ? DOES_NOT_EXIST : old_val; +} + +// Remove the value in associated with . Returns the value removed, or +// DOES_NOT_EXIST if there was no value for that key. +int64_t ht_remove (hash_table_t *ht, const char *key_val, uint32_t key_len) { + hash_table_i_t *hti = *ht; + int64_t val; + uint32_t key_hash = murmur32(key_val, key_len); + do { + val = hti_compare_and_set(hti, key_hash, key_val, key_len, HT_EXPECT_WHATEVER, TOMBSTONE); + if (val != COPIED_VALUE) + return val == TOMBSTONE ? DOES_NOT_EXIST : val; + assert(hti->next); + hti = hti->next; + assert(hti); + } while (1); +} + +// Returns the number of key-values pairs in +uint64_t ht_count (hash_table_t *ht) { + hash_table_i_t *hti = *ht; + uint64_t count = 0; + while (hti) { + count += hti->count; + hti = hti->next; + } + return count; +} + +// Allocate and initialize a new hash table. +hash_table_t *ht_alloc (void) { + hash_table_t *ht = nbd_malloc(sizeof(hash_table_t)); + *ht = (hash_table_i_t *)hti_alloc(ht, MIN_SCALE); + return ht; +} + +// Free and its internal structures. +void ht_free (hash_table_t *ht) { + hash_table_i_t *hti = *ht; + do { + hash_table_i_t *next = hti->next; + nbd_free(hti); + hti = next; + } while (hti); + nbd_free(ht); +} diff --git a/struct/ht_test.c b/struct/ht_test.c new file mode 100644 index 0000000..61b2bfc --- /dev/null +++ b/struct/ht_test.c @@ -0,0 +1,181 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#include +#include +#include "CuTest.h" +#include "common.h" +#include "nbd.h" +#include "rcu.h" +#include "ht.h" +#include "mem.h" + +#define ASSERT_EQUAL(x, y) CuAssertIntEquals(tc, x, y) + +typedef struct worker_data { + int id; + CuTest *tc; + hash_table_t *ht; + int *wait; +} worker_data_t; + +int64_t ht_set (hash_table_t *ht, const char *key, uint32_t key_len, int64_t val) { + return ht_compare_and_set(ht, key, key_len, HT_EXPECT_WHATEVER, val); +} + +int64_t ht_add (hash_table_t *ht, const char *key, uint32_t key_len, int64_t val) { + return ht_compare_and_set(ht, key, key_len, HT_EXPECT_NOT_EXISTS, val); +} + +int64_t ht_replace (hash_table_t *ht, const char *key, uint32_t key_len, int64_t val) { + return ht_compare_and_set(ht, key, key_len, HT_EXPECT_EXISTS, val); +} + +// Test some basic stuff; add a few keys, remove a few keys +void basic_test (CuTest* tc) { + + hash_table_t *ht = ht_alloc(); + + ASSERT_EQUAL( 0, ht_count(ht) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_add(ht,"a",2,10) ); + ASSERT_EQUAL( 1, ht_count(ht) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_add(ht,"b",2,20) ); + ASSERT_EQUAL( 2, ht_count(ht) ); + ASSERT_EQUAL( 20, ht_get(ht,"b",2) ); + ASSERT_EQUAL( 10, ht_set(ht,"a",2,11) ); + ASSERT_EQUAL( 20, ht_set(ht,"b",2,21) ); + ASSERT_EQUAL( 2, ht_count(ht) ); + ASSERT_EQUAL( 21, ht_add(ht,"b",2,22) ); + ASSERT_EQUAL( 11, ht_remove(ht,"a",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_get(ht,"a",2) ); + ASSERT_EQUAL( 1, ht_count(ht) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_remove(ht,"a",2) ); + ASSERT_EQUAL( 21, ht_remove(ht,"b",2) ); + ASSERT_EQUAL( 0, ht_count(ht) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_remove(ht,"b",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_remove(ht,"c",2) ); + ASSERT_EQUAL( 0, ht_count(ht) ); + + ASSERT_EQUAL( DOES_NOT_EXIST, ht_add(ht,"d",2,40) ); + ASSERT_EQUAL( 40, ht_get(ht,"d",2) ); + ASSERT_EQUAL( 1, ht_count(ht) ); + ASSERT_EQUAL( 40, ht_remove(ht,"d",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_get(ht,"d",2) ); + ASSERT_EQUAL( 0, ht_count(ht) ); + + ASSERT_EQUAL( DOES_NOT_EXIST, ht_replace(ht,"d",2,10) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_get(ht,"d",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_set(ht,"d",2,40) ); + ASSERT_EQUAL( 40, ht_replace(ht,"d",2,41) ); + ASSERT_EQUAL( 41, ht_get(ht,"d",2) ); + ASSERT_EQUAL( 41, ht_remove(ht,"d",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_get(ht,"d",2) ); + ASSERT_EQUAL( 0, ht_count(ht) ); + + ASSERT_EQUAL( DOES_NOT_EXIST, ht_replace(ht,"b",2,20) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_get(ht,"b",2) ); + // In the end, all members should be removed + ASSERT_EQUAL( DOES_NOT_EXIST, ht_set(ht,"b",2,20) ); + ASSERT_EQUAL( 20, ht_replace(ht,"b",2,21) ); + ASSERT_EQUAL( 21, ht_get(ht,"b",2) ); + ASSERT_EQUAL( 21, ht_remove(ht,"b",2) ); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_get(ht,"b",2) ); + ASSERT_EQUAL( 0, ht_count(ht) ); + + ht_free(ht); + + // In a quiecent state; it is safe to free. + rcu_update(); +} + +void *simple_worker (void *arg) { + worker_data_t *wd = (worker_data_t *)arg; + hash_table_t *ht = wd->ht; + CuTest* tc = wd->tc; + uint64_t d = wd->id; + int iters = 20000; + + nbd_thread_init(d); + SYNC_ADD(wd->wait, -1); + do { } while (*((volatile worker_data_t *)wd)->wait); // wait for all workers to be ready + + int i, j; + for (j = 0; j < 10; ++j) { + for (i = d; i < iters; i+=2) { + char key[8]; + sprintf(key, "k%u", i); + ASSERT_EQUAL( DOES_NOT_EXIST, ht_add(ht, key, strlen(key)+1, d+1) ); + } + for (i = d; i < iters; i+=2) { + char key[8]; + sprintf(key, "k%u", i); + ASSERT_EQUAL( d+1, ht_remove(ht, key, strlen(key)+1) ); + } + rcu_update(); + } + return NULL; +} + +// Do some simple concurrent testing +void simple_add_remove (CuTest* tc) { + + pthread_t thread[2]; + worker_data_t wd[2]; + int wait = 2; + hash_table_t *ht = ht_alloc(); + + // In 2 threads, add & remove even & odd elements concurrently + int i; + for (i = 0; i < 2; ++i) { + wd[i].id = i; + wd[i].tc = tc; + wd[i].ht = ht; + wd[i].wait = &wait; + int rc = pthread_create(thread + i, NULL, simple_worker, wd + i); + if (rc != 0) { perror("pthread_create"); return; } + } + for (i = 0; i < 2; ++i) { + pthread_join(thread[i], NULL); + } + + // In the end, all members should be removed + ASSERT_EQUAL( 0, ht_count(ht) ); + + // In a quiecent state; it is safe to free. + ht_free(ht); +} + + +void *inserter_worker (void *arg) { + //pthread_t thread[NUM_THREADS]; + + //hash_table_t *ht = ht_alloc(); + return NULL; +} + +// Concurrent insertion +void concurrent_insert (CuTest* tc) { +} + +int main (void) { + + nbd_init(); + nbd_thread_init(0); + + //lwt_set_trace_level("h4"); + + // Create and run test suite + CuString *output = CuStringNew(); + CuSuite* suite = CuSuiteNew(); + + SUITE_ADD_TEST(suite, basic_test); + SUITE_ADD_TEST(suite, simple_add_remove); + + CuSuiteRun(suite); + CuSuiteSummary(suite, output); + CuSuiteDetails(suite, output); + printf("%s\n", output->buffer); + + return 0; +} diff --git a/struct/list.c b/struct/list.c new file mode 100644 index 0000000..28c15aa --- /dev/null +++ b/struct/list.c @@ -0,0 +1,278 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * Harris-Michael lock-free list-based set + * http://www.research.ibm.com/people/m/michael/spaa-2002.pdf + */ +#include +#include +#include + +#include "common.h" +#include "lwt.h" +#include "rcu.h" +#include "mem.h" + +#define NUM_ITERATIONS 10000000 + +#define PLACE_MARK(x) (((size_t)(x))|1) +#define CLEAR_MARK(x) (((size_t)(x))&~(size_t)1) +#define IS_MARKED(x) ((size_t)(x))&1 + +typedef struct node { + struct node *next; + int key; +} node_t; + +typedef struct list { + node_t head[1]; + node_t last; +} list_t; + +static void list_node_init (node_t *item, int key) +{ + memset(item, 0, sizeof(node_t)); + item->key = key; +} + +node_t *list_node_alloc (int key) +{ + node_t *item = (node_t *)nbd_malloc(sizeof(node_t)); + list_node_init(item, key); + return item; +} + +list_t *list_alloc (void) +{ + list_t *list = (list_t *)nbd_malloc(sizeof(list_t)); + list_node_init(list->head, INT_MIN); + list_node_init(&list->last, INT_MAX); + list->head->next = &list->last; + return list; +} + +static void find_pred_and_item (node_t **pred_ptr, node_t **item_ptr, list_t *list, int key) +{ + node_t *pred = list->head; + node_t *item = list->head->next; // head is never removed + TRACE("l3", "find_pred_and_item: searching for key %llu in list (head is %p)", key, pred); +#ifndef NDEBUG + int count = 0; +#endif + do { + // skip removed items + node_t *other, *next = item->next; + TRACE("l3", "find_pred_and_item: visiting item %p (next is %p)", item, next); + while (EXPECT_FALSE(IS_MARKED(next))) { + + // assist in unlinking partially removed items + if ((other = SYNC_CAS(&pred->next, item, CLEAR_MARK(next))) != item) + { + TRACE("l3", "find_pred_and_item: failed to unlink item from pred %p, pred's next pointer was changed to %p", pred, other); + return find_pred_and_item(pred_ptr, item_ptr, list, key); // retry + } + + assert(count++ < 18); + item = (node_t *)CLEAR_MARK(next); + next = item->next; + TRACE("l3", "find_pred_and_item: unlinked item, %p is the new item (next is %p)", item, next); + } + + if (item->key >= key) { + *pred_ptr = pred; + *item_ptr = item; + TRACE("l3", "find_pred_and_item: key found, returning pred %p and item %p", pred, item); + return; + } + + assert(count++ < 18); + pred = item; + item = next; + + } while (1); +} + +int list_insert (list_t *list, node_t *item) +{ + TRACE("l3", "list_insert: inserting %p (with key %llu)", item, item->key); + node_t *pred, *next, *other = (node_t *)-1; + do { + if (other != (node_t *)-1) { + TRACE("l3", "list_insert: failed to swap item into list; pred's next was changed to %p", other, 0); + } + find_pred_and_item(&pred, &next, list, item->key); + + // fail if item already exists in list + if (next->key == item->key) + { + TRACE("l3", "list_insert: insert failed item with key already exists %p", next, 0); + return 0; + } + + item->next = next; + TRACE("l3", "list_insert: attempting to insert item between %p and %p", pred, next); + + } while ((other = __sync_val_compare_and_swap(&pred->next, next, item)) != next); + + TRACE("l3", "list_insert: insert was successful", 0, 0); + + // success + return 1; +} + +node_t *list_remove (list_t *list, int key) +{ + node_t *pred, *item, *next; + + TRACE("l3", "list_remove: removing item with key %llu", key, 0); + find_pred_and_item(&pred, &item, list, key); + if (item->key != key) + { + TRACE("l3", "list_remove: remove failed, key does not exist in list", 0, 0); + return NULL; + } + + // Mark removed, must be atomic. If multiple threads try to remove the + // same item only one of them should succeed + next = item->next; + node_t *other = (node_t *)-1; + if (IS_MARKED(next) || (other = __sync_val_compare_and_swap(&item->next, next, PLACE_MARK(next))) != next) { + if (other == (node_t *)-1) { + TRACE("l3", "list_remove: retry; %p is already marked for removal (it's next pointer is %p)", item, next); + } else { + TRACE("l3", "list_remove: retry; failed to mark %p for removal; it's next pointer was %p, but changed to %p", next, other); + } + return list_remove(list, key); // retry + } + + // Remove from list + TRACE("l3", "list_remove: link item's pred %p to it's successor %p", pred, next); + if ((other = __sync_val_compare_and_swap(&pred->next, item, next)) != item) { + TRACE("l3", "list_remove: link failed; pred's link changed from %p to %p", item, other); + + // make sure item gets unlinked before returning it + node_t *d1, *d2; + find_pred_and_item(&d1, &d2, list, key); + } else { + TRACE("l3", "list_remove: link succeeded; pred's link changed from %p to %p", item, next); + } + + return item; +} + +void list_print (list_t *list) +{ + node_t *item; + item = list->head; + while (item) { + printf("%d ", item->key); + fflush(stdout); + item = item->next; + } + printf("\n"); +} + +#ifdef MAKE_list_test +#include +#include +#include "nbd.h" + +static volatile int wait_; +static long num_threads_; +static list_t *list_; + +void *worker (void *arg) +{ + int id = (int)(size_t)arg; + nbd_thread_init(id); + + unsigned int rand_seed = id+1;//rdtsc_l(); + + // Wait for all the worker threads to be ready. + __sync_fetch_and_add(&wait_, -1); + do {} while (wait_); + __asm__ __volatile__("lfence"); + + int i; + for (i = 0; i < NUM_ITERATIONS/num_threads_; ++i) { + int n = rand_r(&rand_seed); + int key = (n & 0xF) + 1; + if (n & (1 << 8)) { + node_t *item = list_node_alloc(key); + int success = list_insert(list_, item); + if (!success) { + nbd_free(item); + } + } else { + node_t *item = list_remove(list_, key); + if (item) { + nbd_defer_free(item); + } + } + + rcu_update(); + } + + return NULL; +} + +int main (int argc, char **argv) +{ + nbd_init(); + //lwt_set_trace_level("m0l0"); + + char* program_name = argv[0]; + pthread_t thread[MAX_NUM_THREADS]; + + if (argc > 2) { + fprintf(stderr, "Usage: %s num_threads\n", program_name); + return -1; + } + + num_threads_ = 2; + if (argc == 2) + { + errno = 0; + num_threads_ = strtol(argv[1], NULL, 10); + if (errno) { + fprintf(stderr, "%s: Invalid argument for number of threads\n", program_name); + return -1; + } + if (num_threads_ <= 0) { + fprintf(stderr, "%s: Number of threads must be at least 1\n", program_name); + return -1; + } + if (num_threads_ > MAX_NUM_THREADS) { + fprintf(stderr, "%s: Number of threads cannot be more than %d\n", program_name, MAX_NUM_THREADS); + return -1; + } + } + + list_ = list_alloc(); + + struct timeval tv1, tv2; + gettimeofday(&tv1, NULL); + + __asm__ __volatile__("sfence"); + wait_ = num_threads_; + + int i; + for (i = 0; i < num_threads_; ++i) { + int rc = pthread_create(thread + i, NULL, worker, (void*)(size_t)i); + if (rc != 0) { perror("pthread_create"); return rc; } + } + + for (i = 0; i < num_threads_; ++i) { + pthread_join(thread[i], NULL); + } + + gettimeofday(&tv2, NULL); + int ms = (int)(1000000*(tv2.tv_sec - tv1.tv_sec) + tv2.tv_usec - tv1.tv_usec) / 1000; + printf("Th:%ld Time:%dms\n", num_threads_, ms); + list_print(list_); + lwt_dump("lwt.out"); + + return 0; +} +#endif//list_test diff --git a/tags b/tags new file mode 100644 index 0000000..7f019cb --- /dev/null +++ b/tags @@ -0,0 +1,335 @@ +!_TAG_FILE_FORMAT 2 /extended format; --format=1 will not append ;" to lines/ +!_TAG_FILE_SORTED 1 /0=unsorted, 1=sorted, 2=foldcase/ +!_TAG_PROGRAM_AUTHOR Darren Hiebert /dhiebert@users.sourceforge.net/ +!_TAG_PROGRAM_NAME Exuberant Ctags // +!_TAG_PROGRAM_URL http://ctags.sourceforge.net /official site/ +!_TAG_PROGRAM_VERSION 5.7 // +ASSERT_EQUAL struct/ht_test.c /^#define ASSERT_EQUAL(/;" d file: +CACHE_LINE_SIZE include/common.h /^#define CACHE_LINE_SIZE /;" d +CAT include/common.h /^#define CAT(/;" d +CFLAGS makefile /^CFLAGS := -g -Wall -Werror -std=c99 -m64 -fnested-functions -fwhole-program -combine -03 -DENABLE_TRACE$/;" m +CLEAR_MARK struct/list.c /^#define CLEAR_MARK(/;" d file: +COPIED_VALUE struct/ht.c /^#define COPIED_VALUE /;" d file: +CU_ALLOC include/CuTest.h /^#define CU_ALLOC(/;" d +CU_TEST_H include/CuTest.h /^#define CU_TEST_H$/;" d +CuAssert include/CuTest.h /^#define CuAssert(/;" d +CuAssertDblEquals include/CuTest.h /^#define CuAssertDblEquals(/;" d +CuAssertDblEquals_LineMsg util/CuTest.c /^void CuAssertDblEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, $/;" f +CuAssertDblEquals_Msg include/CuTest.h /^#define CuAssertDblEquals_Msg(/;" d +CuAssertIntEquals include/CuTest.h /^#define CuAssertIntEquals(/;" d +CuAssertIntEquals_LineMsg util/CuTest.c /^void CuAssertIntEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, $/;" f +CuAssertIntEquals_Msg include/CuTest.h /^#define CuAssertIntEquals_Msg(/;" d +CuAssertPtrEquals include/CuTest.h /^#define CuAssertPtrEquals(/;" d +CuAssertPtrEquals_LineMsg util/CuTest.c /^void CuAssertPtrEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, $/;" f +CuAssertPtrEquals_Msg include/CuTest.h /^#define CuAssertPtrEquals_Msg(/;" d +CuAssertPtrNotNull include/CuTest.h /^#define CuAssertPtrNotNull(/;" d +CuAssertPtrNotNullMsg include/CuTest.h /^#define CuAssertPtrNotNullMsg(/;" d +CuAssertStrEquals include/CuTest.h /^#define CuAssertStrEquals(/;" d +CuAssertStrEquals_LineMsg util/CuTest.c /^void CuAssertStrEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, $/;" f +CuAssertStrEquals_Msg include/CuTest.h /^#define CuAssertStrEquals_Msg(/;" d +CuAssertTrue include/CuTest.h /^#define CuAssertTrue(/;" d +CuAssert_Line util/CuTest.c /^void CuAssert_Line(CuTest* tc, const char* file, int line, const char* message, int condition)$/;" f +CuFail include/CuTest.h /^#define CuFail(/;" d +CuFailInternal util/CuTest.c /^static void CuFailInternal(CuTest* tc, const char* file, int line, CuString* string)$/;" f file: +CuFail_Line util/CuTest.c /^void CuFail_Line(CuTest* tc, const char* file, int line, const char* message2, const char* message)$/;" f +CuStrAlloc util/CuTest.c /^char* CuStrAlloc(int size)$/;" f +CuStrCopy util/CuTest.c /^char* CuStrCopy(const char* old)$/;" f +CuString include/CuTest.h /^} CuString;$/;" t typeref:struct:__anon1 +CuStringAppend util/CuTest.c /^void CuStringAppend(CuString* str, const char* text)$/;" f +CuStringAppendChar util/CuTest.c /^void CuStringAppendChar(CuString* str, char ch)$/;" f +CuStringAppendFormat util/CuTest.c /^void CuStringAppendFormat(CuString* str, const char* format, ...)$/;" f +CuStringInit util/CuTest.c /^void CuStringInit(CuString* str)$/;" f +CuStringInsert util/CuTest.c /^void CuStringInsert(CuString* str, const char* text, int pos)$/;" f +CuStringNew util/CuTest.c /^CuString* CuStringNew(void)$/;" f +CuStringResize util/CuTest.c /^void CuStringResize(CuString* str, int newSize)$/;" f +CuSuite include/CuTest.h /^} CuSuite;$/;" t typeref:struct:__anon2 +CuSuiteAdd util/CuTest.c /^void CuSuiteAdd(CuSuite* testSuite, CuTest *testCase)$/;" f +CuSuiteAddSuite util/CuTest.c /^void CuSuiteAddSuite(CuSuite* testSuite, CuSuite* testSuite2)$/;" f +CuSuiteDetails util/CuTest.c /^void CuSuiteDetails(CuSuite* testSuite, CuString* details)$/;" f +CuSuiteInit util/CuTest.c /^void CuSuiteInit(CuSuite* testSuite)$/;" f +CuSuiteNew util/CuTest.c /^CuSuite* CuSuiteNew(void)$/;" f +CuSuiteRun util/CuTest.c /^void CuSuiteRun(CuSuite* testSuite)$/;" f +CuSuiteSummary util/CuTest.c /^void CuSuiteSummary(CuSuite* testSuite, CuString* summary)$/;" f +CuTest include/CuTest.h /^struct CuTest$/;" s +CuTest include/CuTest.h /^typedef struct CuTest CuTest;$/;" t typeref:struct:CuTest +CuTestInit util/CuTest.c /^void CuTestInit(CuTest* t, const char* name, TestFunction function)$/;" f +CuTestNew util/CuTest.c /^CuTest* CuTestNew(const char* name, TestFunction function)$/;" f +CuTestRun util/CuTest.c /^void CuTestRun(CuTest* tc)$/;" f +DECLARE_THREAD_LOCAL include/tls.h /^#define DECLARE_THREAD_LOCAL(/;" d +DOES_NOT_EXIST include/ht.h /^#define DOES_NOT_EXIST /;" d +ENTRIES_PER_BUCKET struct/ht.c /^#define ENTRIES_PER_BUCKET /;" d file: +ENTRIES_PER_COPY_CHUNK struct/ht.c /^#define ENTRIES_PER_COPY_CHUNK /;" d file: +EXES makefile /^EXES := $(TESTS)$/;" m +EXPECT_FALSE include/common.h /^#define EXPECT_FALSE(/;" d +EXPECT_TRUE include/common.h /^#define EXPECT_TRUE(/;" d +FALSE include/common.h /^#define FALSE /;" d +GET_SCALE util/mem.c /^#define GET_SCALE(/;" d file: +GlobalVersion txn/txn.c /^uint64_t GlobalVersion = 1;$/;" v +HT_EXPECT_EXISTS include/ht.h /^#define HT_EXPECT_EXISTS /;" d +HT_EXPECT_NOT_EXISTS include/ht.h /^#define HT_EXPECT_NOT_EXISTS /;" d +HT_EXPECT_WHATEVER include/ht.h /^#define HT_EXPECT_WHATEVER /;" d +HUGE_STRING_LEN include/CuTest.h /^#define HUGE_STRING_LEN /;" d +INCS makefile /^INCS := $(addprefix -I, include)$/;" m +INITIAL_WRITES_SIZE txn/txn.c /^#define INITIAL_WRITES_SIZE /;" d file: +INIT_THREAD_LOCAL include/tls.h /^#define INIT_THREAD_LOCAL /;" d +INIT_THREAD_LOCAL include/tls.h /^#define INIT_THREAD_LOCAL(/;" d +IS_MARKED struct/list.c /^#define IS_MARKED(/;" d file: +IS_TAGGED include/common.h /^#define IS_TAGGED(/;" d +LOCALIZE include/tls.h /^#define LOCALIZE /;" d +LOCALIZE include/tls.h /^#define LOCALIZE(/;" d +LWT_BUFFER_MASK include/lwt.h /^#define LWT_BUFFER_MASK /;" d +LWT_BUFFER_SCALE include/lwt.h /^#define LWT_BUFFER_SCALE /;" d +LWT_BUFFER_SIZE include/lwt.h /^#define LWT_BUFFER_SIZE /;" d +LWT_H include/lwt.h /^#define LWT_H$/;" d +MASK include/common.h /^#define MASK(/;" d +MAX_BUCKETS_TO_PROBE struct/ht.c /^#define MAX_BUCKETS_TO_PROBE /;" d file: +MAX_NUM_THREADS struct/list.c /^#define MAX_NUM_THREADS /;" d file: +MAX_SCALE util/mem.c /^#define MAX_SCALE /;" d file: +MAX_TEST_CASES include/CuTest.h /^#define MAX_TEST_CASES /;" d +MEM_H include/mem.h /^#define MEM_H$/;" d +MIN_SCALE struct/ht.c /^#define MIN_SCALE /;" d file: +MinActiveTxnVersion txn/txn.c /^uint64_t MinActiveTxnVersion = 0;$/;" v +NBD_H include/nbd.h /^#define NBD_H$/;" d +NBHT_H include/ht.h /^#define NBHT_H$/;" d +NUM_ITERATIONS struct/list.c /^#define NUM_ITERATIONS /;" d file: +NUM_ITERATIONS util/rcu.c /^#define NUM_ITERATIONS /;" d file: +NUM_THREADS include/common.h /^#define NUM_THREADS /;" d +ON_EXIT_SCOPE include/common.h /^#define ON_EXIT_SCOPE(/;" d +ON_EXIT_SCOPE_I include/common.h /^#define ON_EXIT_SCOPE_I(/;" d +PLACE_MARK struct/list.c /^#define PLACE_MARK(/;" d file: +RCU_POST_THRESHOLD util/rcu.c /^#define RCU_POST_THRESHOLD /;" d file: +RCU_QUEUE_SCALE util/rcu.c /^#define RCU_QUEUE_SCALE /;" d file: +SET_THREAD_LOCAL include/tls.h /^#define SET_THREAD_LOCAL /;" d +SET_THREAD_LOCAL include/tls.h /^#define SET_THREAD_LOCAL(/;" d +STRING_INC include/CuTest.h /^#define STRING_INC /;" d +STRING_MAX include/CuTest.h /^#define STRING_MAX /;" d +STRIP_TAG include/common.h /^#define STRIP_TAG(/;" d +SUITE_ADD_TEST include/CuTest.h /^#define SUITE_ADD_TEST(/;" d +SUPERBLOCK_SCALE util/mem.c /^#define SUPERBLOCK_SCALE /;" d file: +SUPERBLOCK_SIZE util/mem.c /^#define SUPERBLOCK_SIZE /;" d file: +SYNC_ADD include/common.h /^#define SYNC_ADD /;" d +SYNC_CAS include/common.h /^#define SYNC_CAS /;" d +SYNC_FETCH_AND_OR include/common.h /^#define SYNC_FETCH_AND_OR /;" d +SYNC_SWAP include/common.h /^#define SYNC_SWAP /;" d +TAG include/common.h /^#define TAG /;" d +TAG_VALUE include/common.h /^#define TAG_VALUE(/;" d +TESTS makefile /^TESTS := $(addsuffix _test, $(addprefix output\/,rcu list ht txn))$/;" m +TLS_H include/tls.h /^#define TLS_H$/;" d +TOMBSTONE struct/ht.c /^#define TOMBSTONE /;" d file: +TRACE include/lwt.h /^#define TRACE(/;" d +TRUE include/common.h /^#define TRUE /;" d +TXN_ABORTED txn/txn.c /^typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;$/;" e enum:__anon5 file: +TXN_BLIND_WRITE include/txn.h /^typedef enum { TXN_READ_WRITE, TXN_READ_ONLY, TXN_BLIND_WRITE } txn_access_t;$/;" e enum:__anon3 +TXN_DIRTY_READ include/txn.h /^typedef enum { TXN_DIRTY_READ, TXN_READ_COMMITTED, TXN_REPEATABLE_READ } txn_isolation_t;$/;" e enum:__anon4 +TXN_READ_COMMITTED include/txn.h /^typedef enum { TXN_DIRTY_READ, TXN_READ_COMMITTED, TXN_REPEATABLE_READ } txn_isolation_t;$/;" e enum:__anon4 +TXN_READ_ONLY include/txn.h /^typedef enum { TXN_READ_WRITE, TXN_READ_ONLY, TXN_BLIND_WRITE } txn_access_t;$/;" e enum:__anon3 +TXN_READ_WRITE include/txn.h /^typedef enum { TXN_READ_WRITE, TXN_READ_ONLY, TXN_BLIND_WRITE } txn_access_t;$/;" e enum:__anon3 +TXN_REPEATABLE_READ include/txn.h /^typedef enum { TXN_DIRTY_READ, TXN_READ_COMMITTED, TXN_REPEATABLE_READ } txn_isolation_t;$/;" e enum:__anon4 +TXN_RUNNING txn/txn.c /^typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;$/;" e enum:__anon5 file: +TXN_VALIDATED txn/txn.c /^typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;$/;" e enum:__anon5 file: +TXN_VALIDATING txn/txn.c /^typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;$/;" e enum:__anon5 file: +TestFunction include/CuTest.h /^typedef void (*TestFunction)(CuTest *);$/;" t +UNDETERMINED_VERSION txn/txn.c /^#define UNDETERMINED_VERSION /;" d file: +UPDATE_TYPE_DELETE txn/txn.c /^typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;$/;" e enum:__anon6 file: +UPDATE_TYPE_PUT txn/txn.c /^typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;$/;" e enum:__anon6 file: +UTIL_SRCS makefile /^UTIL_SRCS := util\/rcu.c util\/lwt.c util\/CuTest.c util\/mem.c util\/nbd.c$/;" m +YAMS_H include/common.h /^#define YAMS_H$/;" d +access txn/txn.c /^ txn_access_t access;$/;" m struct:txn file: +alloc_update_rec txn/txn.c /^update_rec_t *alloc_update_rec (void) {$/;" f +basic_test struct/ht_test.c /^void basic_test (CuTest* tc) {$/;" f +block util/mem.c /^typedef struct block {$/;" s file: +block_t util/mem.c /^} block_t;$/;" t typeref:struct:block file: +buf_count_ util/lwt.c /^static int buf_count_ = 0;$/;" v file: +buffer include/CuTest.h /^ char* buffer;$/;" m struct:__anon1 +concurrent_insert struct/ht_test.c /^void concurrent_insert (CuTest* tc) {$/;" f +concurrent_simple struct/ht_test.c /^void concurrent_simple (CuTest* tc) {$/;" f +count include/CuTest.h /^ int count;$/;" m struct:__anon2 +count struct/ht.c /^ int count; \/\/ TODO: make these counters distributed$/;" m struct:hash_table_i file: +deferred_ struct/ht.c /^static hash_table_i_t *deferred_ = NULL;$/;" v file: +dequeue struct/queue.c /^queue_elem_t dequeue (queue_t *q)$/;" f +dump_buffer util/lwt.c /^static void dump_buffer (FILE *file, int thread_id, uint64_t offset)$/;" f file: +dump_record util/lwt.c /^static inline void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset)$/;" f file: +enqueue struct/queue.c /^void enqueue (queue_t *q, queue_elem_t *e)$/;" f +entry_t struct/ht.c /^} entry_t;$/;" t typeref:struct:ht_entry file: +failCount include/CuTest.h /^ int failCount;$/;" m struct:__anon2 +failed include/CuTest.h /^ int failed;$/;" m struct:CuTest +fifo util/rcu.c /^typedef struct fifo {$/;" s file: +fifo_alloc util/rcu.c /^static fifo_t *fifo_alloc(int scale)$/;" f file: +fifo_dequeue util/rcu.c /^static void *fifo_dequeue (fifo_t *q)$/;" f file: +fifo_enqueue util/rcu.c /^static void fifo_enqueue (fifo_t *q, void *x)$/;" f file: +fifo_index util/rcu.c /^static uint32_t fifo_index (fifo_t *q, uint32_t i)$/;" f file: +fifo_t util/rcu.c /^} fifo_t;$/;" t typeref:struct:fifo file: +find_pred_and_item struct/list.c /^static void find_pred_and_item (node_t **pred_ptr, node_t **item_ptr, list_t *list, int key)$/;" f file: +flag_mask_ util/lwt.c /^uint64_t flag_mask_ = 0;$/;" v +flags_ util/lwt.c /^static const char *flags_ = "";$/;" v file: +format include/lwt.h /^ const char *format;$/;" m struct:lwt_record +free include/common.h /^#define free /;" d +free_list_ util/mem.c /^static block_t free_list_[NUM_THREADS][MAX_SCALE+1][NUM_THREADS];$/;" v file: +function include/CuTest.h /^ TestFunction function;$/;" m struct:CuTest +get_new_superblock util/mem.c /^static void *get_new_superblock (int scale) {$/;" f file: +get_next_ndx struct/ht.c /^static inline int get_next_ndx(int old_ndx, uint32_t key_hash, int ht_scale) {$/;" f file: +hash_table_i struct/ht.c /^typedef struct hash_table_i {$/;" s file: +hash_table_i_t struct/ht.c /^} hash_table_i_t;$/;" t typeref:struct:hash_table_i file: +hash_table_t include/ht.h /^typedef struct hash_table_i *hash_table_t;$/;" t typeref:struct:hash_table_i +head include/lwt.h /^ uint32_t head;$/;" m struct:lwt_buffer +head struct/list.c /^ node_t head[1];$/;" m struct:list file: +head struct/queue.c /^ queue_elem_t *head;$/;" m struct:queue file: +head util/rcu.c /^ node_t *head;$/;" m struct:lifo file: +head util/rcu.c /^ uint32_t head;$/;" m struct:fifo file: +header util/mem.c /^typedef struct header {$/;" s file: +header_t util/mem.c /^} header_t;$/;" t typeref:struct:header file: +ht struct/ht.c /^ hash_table_t *ht; \/\/ parent ht;$/;" m struct:hash_table_i file: +ht struct/ht_test.c /^ hash_table_t *ht;$/;" m struct:worker_data file: +ht txn/txn.c /^ hash_table_t *ht;$/;" m struct:txn file: +ht_add struct/ht_test.c /^int64_t ht_add (hash_table_t *ht, const char *key, int64_t val) {$/;" f +ht_alloc struct/ht.c /^hash_table_t *ht_alloc (void) {$/;" f +ht_compare_and_set struct/ht.c /^int64_t ht_compare_and_set (hash_table_t *ht, const char *key_string, $/;" f +ht_count struct/ht.c /^uint64_t ht_count (hash_table_t *ht) {$/;" f +ht_entry struct/ht.c /^typedef struct ht_entry {$/;" s file: +ht_free struct/ht.c /^void ht_free (hash_table_t *ht) {$/;" f +ht_free_deferred struct/ht.c /^void ht_free_deferred (void) {$/;" f +ht_get struct/ht.c /^int64_t ht_get (hash_table_t *ht, const char *key_string) {$/;" f +ht_key_equals struct/ht.c /^static inline int ht_key_equals (uint64_t a, uint32_t b_hash, const char *b_value) {$/;" f file: +ht_remove struct/ht.c /^int64_t ht_remove (hash_table_t *ht, const char *key_string) {$/;" f +ht_replace struct/ht_test.c /^int64_t ht_replace (hash_table_t *ht, const char *key, int64_t val) {$/;" f +ht_set struct/ht_test.c /^int64_t ht_set (hash_table_t *ht, const char *key, int64_t val) {$/;" f +ht_test_SRCS makefile /^ht_test_SRCS := $(UTIL_SRCS) struct\/ht.c struct\/ht_test.c$/;" m +hti_alloc struct/ht.c /^static hash_table_i_t *hti_alloc (hash_table_t *parent, int scale) {$/;" f file: +hti_compare_and_set struct/ht.c /^static int64_t hti_compare_and_set (hash_table_i_t *hti, uint32_t key_hash, const char *key_string, $/;" f file: +hti_copy_entry struct/ht.c /^static int hti_copy_entry (hash_table_i_t *old_hti, volatile entry_t *old_e, uint32_t key_hash, $/;" f file: +hti_defer_free struct/ht.c /^static void hti_defer_free (hash_table_i_t *hti) {$/;" f file: +hti_get struct/ht.c /^static int64_t hti_get (hash_table_i_t *hti, uint32_t key_hash, const char *key_string) {$/;" f file: +hti_lookup struct/ht.c /^static volatile entry_t *hti_lookup (hash_table_i_t *hti, uint32_t key_hash, const char *key_string, int *is_empty) {$/;" f file: +hti_start_copy struct/ht.c /^static void hti_start_copy (hash_table_i_t *hti) {$/;" f file: +id struct/ht_test.c /^ int id;$/;" m struct:worker_data file: +inserter_worker struct/ht_test.c /^void *inserter_worker (void *arg) {$/;" f +int64_t include/common.h /^typedef long long int64_t;$/;" t +isolation txn/txn.c /^ txn_isolation_t isolation;$/;" m struct:txn file: +jumpBuf include/CuTest.h /^ jmp_buf *jumpBuf;$/;" m struct:CuTest +key struct/ht.c /^ int64_t key;$/;" m struct:ht_entry file: +key struct/list.c /^ int key;$/;" m struct:node file: +key txn/txn.c /^ struct { const char *key; update_rec_t *rec; } *writes;$/;" m struct:txn::__anon7 file: +last struct/list.c /^ node_t last;$/;" m struct:list file: +last_posted_ util/rcu.c /^static uint64_t **last_posted_;$/;" v file: +length include/CuTest.h /^ int length;$/;" m struct:__anon1 +lifo util/rcu.c /^typedef struct lifo {$/;" s file: +lifo_aba_pop util/rcu.c /^node_t *lifo_aba_pop (lifo_t *stk)$/;" f +lifo_aba_push util/rcu.c /^static void lifo_aba_push (lifo_t *stk, node_t *x)$/;" f file: +lifo_alloc util/rcu.c /^static lifo_t *lifo_alloc (void)$/;" f file: +lifo_t util/rcu.c /^} lifo_t;$/;" t typeref:struct:lifo file: +list include/CuTest.h /^ CuTest* list[MAX_TEST_CASES];$/;" m struct:__anon2 +list struct/list.c /^typedef struct list {$/;" s file: +list_ struct/list.c /^static list_t *list_;$/;" v file: +list_alloc struct/list.c /^list_t *list_alloc (void)$/;" f +list_insert struct/list.c /^int list_insert (list_t *list, node_t *item)$/;" f +list_node_alloc struct/list.c /^node_t *list_node_alloc (int key)$/;" f +list_node_init struct/list.c /^static void list_node_init (node_t *item, int key)$/;" f file: +list_print struct/list.c /^void list_print (list_t *list)$/;" f +list_remove struct/list.c /^node_t *list_remove (list_t *list, int key)$/;" f +list_t struct/list.c /^} list_t;$/;" t typeref:struct:list file: +list_test_SRCS makefile /^list_test_SRCS := $(UTIL_SRCS) struct\/list.c$/;" m +lwt_buf_ util/lwt.c /^lwt_buffer_t **lwt_buf_ = NULL;$/;" v +lwt_buffer include/lwt.h /^typedef struct lwt_buffer {$/;" s +lwt_buffer_t include/lwt.h /^} lwt_buffer_t;$/;" t typeref:struct:lwt_buffer +lwt_dump util/lwt.c /^void lwt_dump (const char *file_name)$/;" f +lwt_init util/lwt.c /^void lwt_init (void)$/;" f +lwt_record include/lwt.h /^typedef struct lwt_record {$/;" s +lwt_record_t include/lwt.h /^} lwt_record_t;$/;" t typeref:struct:lwt_record +lwt_set_trace_level util/lwt.c /^void lwt_set_trace_level (const char *flags)$/;" f +lwt_thread_init util/lwt.c /^void lwt_thread_init (int thread_id)$/;" f +lwt_trace include/lwt.h /^static inline void lwt_trace (const char *flag, const char *format, size_t value1, size_t value2) {$/;" f +main struct/ht_test.c /^int main (void) {$/;" f +main struct/list.c /^int main (int argc, char **argv)$/;" f +main txn/txn.c /^int main (void) {$/;" f +main util/rcu.c /^int main (void)$/;" f +malloc include/common.h /^#define malloc /;" d +max_probe struct/ht.c /^ int max_probe;$/;" m struct:hash_table_i file: +mem_init util/mem.c /^void mem_init (void) {$/;" f +message include/CuTest.h /^ const char* message;$/;" m struct:CuTest +murmur32 include/murmur.h /^static inline uint32_t murmur32 (const char *key, int len)$/;" f +name include/CuTest.h /^ const char* name;$/;" m struct:CuTest +nbd_defer_free util/rcu.c /^void nbd_defer_free (void *x)$/;" f +nbd_free util/mem.c /^void nbd_free (void *x) {$/;" f +nbd_init util/nbd.c /^void nbd_init (void) {$/;" f +nbd_malloc util/mem.c /^void *nbd_malloc (size_t n) {$/;" f +nbd_thread_init util/nbd.c /^void nbd_thread_init (int id) {$/;" f +next struct/ht.c /^ struct hash_table_i *next;$/;" m struct:hash_table_i typeref:struct:hash_table_i::hash_table_i file: +next struct/list.c /^ struct node *next;$/;" m struct:node typeref:struct:node::node file: +next struct/queue.c /^ struct queue_elem *next;$/;" m struct:queue_elem typeref:struct:queue_elem::queue_elem file: +next util/mem.c /^ struct block *next;$/;" m struct:block typeref:struct:block::block file: +next util/rcu.c /^ struct node *next;$/;" m struct:node typeref:struct:node::node file: +next_free struct/ht.c /^ struct hash_table_i *next_free;$/;" m struct:hash_table_i typeref:struct:hash_table_i::hash_table_i file: +node struct/list.c /^typedef struct node {$/;" s file: +node util/rcu.c /^typedef struct node {$/;" s file: +node_alloc util/rcu.c /^node_t *node_alloc (void)$/;" f +node_t struct/list.c /^} node_t;$/;" t typeref:struct:node file: +node_t util/rcu.c /^} node_t;$/;" t typeref:struct:node file: +num_entries_copied struct/ht.c /^ int num_entries_copied;$/;" m struct:hash_table_i file: +num_threads_ struct/list.c /^static long num_threads_;$/;" v file: +owner util/mem.c /^ char owner; \/\/ thread id of owner$/;" m struct:header file: +pending_ util/rcu.c /^static fifo_t **pending_;$/;" v file: +prev txn/txn.c /^ update_rec_t *prev; \/\/ a previous update$/;" m struct:update_rec file: +queue struct/queue.c /^typedef struct queue {$/;" s file: +queue_elem struct/queue.c /^typedef struct queue_elem {$/;" s file: +queue_elem_t struct/queue.c /^} queue_elem_t;$/;" t typeref:struct:queue_elem file: +queue_t struct/queue.c /^} queue_t;$/;" t typeref:struct:queue file: +ran include/CuTest.h /^ int ran;$/;" m struct:CuTest +rcu_ util/rcu.c /^static uint64_t **rcu_;$/;" v file: +rcu_init util/rcu.c /^void rcu_init (void)$/;" f +rcu_post util/rcu.c /^static void rcu_post (uint64_t x)$/;" f file: +rcu_test_SRCS makefile /^rcu_test_SRCS := $(UTIL_SRCS)$/;" m +rcu_update util/rcu.c /^void rcu_update (void)$/;" f +rec txn/txn.c /^ struct { const char *key; update_rec_t *rec; } *writes;$/;" m struct:txn::__anon7 file: +rv txn/txn.c /^ uint64_t rv;$/;" m struct:txn file: +sb_header_ util/mem.c /^static header_t *sb_header_ = NULL;$/;" v file: +scale struct/ht.c /^ unsigned int scale;$/;" m struct:hash_table_i file: +scale util/mem.c /^ char scale; \/\/ log2 of block size$/;" m struct:header file: +scale util/rcu.c /^ uint32_t scale;$/;" m struct:fifo file: +scan struct/ht.c /^ int scan;$/;" m struct:hash_table_i file: +simple_worker struct/ht_test.c /^void *simple_worker (void *arg) {$/;" f +size include/CuTest.h /^ int size;$/;" m struct:__anon1 +state txn/txn.c /^ txn_state_t state;$/;" m struct:txn file: +stk_ util/rcu.c /^static lifo_t *stk_;$/;" v file: +table struct/ht.c /^ volatile entry_t *table;$/;" m struct:hash_table_i file: +tail struct/queue.c /^ queue_elem_t *tail;$/;" m struct:queue file: +tail util/rcu.c /^ uint32_t tail;$/;" m struct:fifo file: +tc struct/ht_test.c /^ CuTest *tc;$/;" m struct:worker_data file: +timestamp include/lwt.h /^ uint64_t timestamp;$/;" m struct:lwt_record +txn txn/txn.c /^struct txn {$/;" s file: +txn_abort txn/txn.c /^void txn_abort (txn_t *txn) {$/;" f +txn_access_t include/txn.h /^typedef enum { TXN_READ_WRITE, TXN_READ_ONLY, TXN_BLIND_WRITE } txn_access_t;$/;" t typeref:enum:__anon3 +txn_begin txn/txn.c /^txn_t *txn_begin (txn_access_t access, txn_isolation_t isolation, hash_table_t *ht) {$/;" f +txn_commit txn/txn.c /^txn_state_t txn_commit (txn_t *txn) {$/;" f +txn_ht_get txn/txn.c /^int64_t txn_ht_get (txn_t *txn, const char *key) {$/;" f +txn_ht_put txn/txn.c /^void txn_ht_put (txn_t *txn, const char *key, int64_t value) {$/;" f +txn_ht_validate_key txn/txn.c /^static txn_state_t txn_ht_validate_key (txn_t *txn, const char *key) {$/;" f file: +txn_isolation_t include/txn.h /^typedef enum { TXN_DIRTY_READ, TXN_READ_COMMITTED, TXN_REPEATABLE_READ } txn_isolation_t;$/;" t typeref:enum:__anon4 +txn_state_t txn/txn.c /^typedef enum { TXN_RUNNING, TXN_VALIDATING, TXN_VALIDATED, TXN_ABORTED } txn_state_t;$/;" t typeref:enum:__anon5 file: +txn_t include/txn.h /^typedef struct txn txn_t;$/;" t typeref:struct:txn +txn_test_SRCS makefile /^txn_test_SRCS := $(UTIL_SRCS) struct\/ht.c txn\/txn.c$/;" m +txn_validate txn/txn.c /^static txn_state_t txn_validate (txn_t *txn) {$/;" f file: +type txn/txn.c /^ update_type_t type;$/;" m struct:update_rec file: +uint32_t include/common.h /^typedef unsigned int uint32_t;$/;" t +uint64_t include/common.h /^typedef unsigned long long uint64_t;$/;" t +update_rec txn/txn.c /^struct update_rec {$/;" s file: +update_rec_t txn/txn.c /^typedef struct update_rec update_rec_t;$/;" t typeref:struct:update_rec file: +update_type_t txn/txn.c /^typedef enum { UPDATE_TYPE_PUT, UPDATE_TYPE_DELETE } update_type_t;$/;" t typeref:enum:__anon6 file: +value struct/ht.c /^ int64_t value;$/;" m struct:ht_entry file: +value txn/txn.c /^ uint64_t value;$/;" m struct:update_rec file: +value1 include/lwt.h /^ size_t value1;$/;" m struct:lwt_record +value2 include/lwt.h /^ size_t value2;$/;" m struct:lwt_record +version txn/txn.c /^ uint64_t version;$/;" m struct:update_rec file: +wait struct/ht_test.c /^ int *wait;$/;" m struct:worker_data file: +wait_ struct/list.c /^static volatile int wait_;$/;" v file: +wait_ util/rcu.c /^static volatile int wait_;$/;" v file: +worker struct/list.c /^void *worker (void *arg)$/;" f +worker util/rcu.c /^void *worker (void *arg)$/;" f +worker_data struct/ht_test.c /^typedef struct worker_data {$/;" s file: +worker_data_t struct/ht_test.c /^} worker_data_t;$/;" t typeref:struct:worker_data file: +writes txn/txn.c /^ struct { const char *key; update_rec_t *rec; } *writes;$/;" m struct:txn typeref:struct:txn::__anon7 file: +writes_count txn/txn.c /^ uint32_t writes_count;$/;" m struct:txn file: +writes_scan txn/txn.c /^ uint32_t writes_scan;$/;" m struct:txn file: +writes_size txn/txn.c /^ uint32_t writes_size;$/;" m struct:txn file: +wv txn/txn.c /^ uint64_t wv;$/;" m struct:txn file: +x include/lwt.h /^ lwt_record_t x[0];$/;" m struct:lwt_buffer +x util/rcu.c /^ void *x[0];$/;" m struct:fifo file: diff --git a/todo b/todo new file mode 100644 index 0000000..ff93413 --- /dev/null +++ b/todo @@ -0,0 +1,7 @@ +- make rcu wait when its buffer gets full, instead of crashing +- fix makefile to compute dependency info as a side-effect of compilation (-MF) +- investigate 16 byte CAS + - ht can store GUIDs inline instead of pointers to actual keys + - mem can keep tail pointers for free-lists and do O(1) appends +- test ht +- optimize tracing code, still too much overhead diff --git a/util/CuTest.c b/util/CuTest.c new file mode 100644 index 0000000..fb4950d --- /dev/null +++ b/util/CuTest.c @@ -0,0 +1,309 @@ +#include +#include +#include +#include +#include +#include + +#include "CuTest.h" + +/*-------------------------------------------------------------------------* + * CuStr + *-------------------------------------------------------------------------*/ + +char* CuStrAlloc(int size) +{ + char* newStr = (char*) malloc( sizeof(char) * (size) ); + return newStr; +} + +char* CuStrCopy(const char* old) +{ + int len = strlen(old); + char* newStr = CuStrAlloc(len + 1); + strcpy(newStr, old); + return newStr; +} + +/*-------------------------------------------------------------------------* + * CuString + *-------------------------------------------------------------------------*/ + +void CuStringInit(CuString* str) +{ + str->length = 0; + str->size = STRING_MAX; + str->buffer = (char*) malloc(sizeof(char) * str->size); + str->buffer[0] = '\0'; +} + +CuString* CuStringNew(void) +{ + CuString* str = (CuString*) malloc(sizeof(CuString)); + str->length = 0; + str->size = STRING_MAX; + str->buffer = (char*) malloc(sizeof(char) * str->size); + str->buffer[0] = '\0'; + return str; +} + +void CuStringResize(CuString* str, int newSize) +{ + str->buffer = (char*) realloc(str->buffer, sizeof(char) * newSize); + str->size = newSize; +} + +void CuStringAppend(CuString* str, const char* text) +{ + int length; + + if (text == NULL) { + text = "NULL"; + } + + length = strlen(text); + if (str->length + length + 1 >= str->size) + CuStringResize(str, str->length + length + 1 + STRING_INC); + str->length += length; + strcat(str->buffer, text); +} + +void CuStringAppendChar(CuString* str, char ch) +{ + char text[2]; + text[0] = ch; + text[1] = '\0'; + CuStringAppend(str, text); +} + +void CuStringAppendFormat(CuString* str, const char* format, ...) +{ + va_list argp; + char buf[HUGE_STRING_LEN]; + va_start(argp, format); + vsprintf(buf, format, argp); + va_end(argp); + CuStringAppend(str, buf); +} + +void CuStringInsert(CuString* str, const char* text, int pos) +{ + int length = strlen(text); + if (pos > str->length) + pos = str->length; + if (str->length + length + 1 >= str->size) + CuStringResize(str, str->length + length + 1 + STRING_INC); + memmove(str->buffer + pos + length, str->buffer + pos, (str->length - pos) + 1); + str->length += length; + memcpy(str->buffer + pos, text, length); +} + +/*-------------------------------------------------------------------------* + * CuTest + *-------------------------------------------------------------------------*/ + +void CuTestInit(CuTest* t, const char* name, TestFunction function) +{ + t->name = CuStrCopy(name); + t->failed = 0; + t->ran = 0; + t->message = NULL; + t->function = function; + t->jumpBuf = NULL; +} + +CuTest* CuTestNew(const char* name, TestFunction function) +{ + CuTest* tc = CU_ALLOC(CuTest); + CuTestInit(tc, name, function); + return tc; +} + +void CuTestRun(CuTest* tc) +{ + jmp_buf buf; + tc->jumpBuf = &buf; + if (setjmp(buf) == 0) + { + tc->ran = 1; + (tc->function)(tc); + } + tc->jumpBuf = 0; +} + +static void CuFailInternal(CuTest* tc, const char* file, int line, CuString* string) +{ + char buf[HUGE_STRING_LEN]; + + sprintf(buf, "%s:%d: ", file, line); + CuStringInsert(string, buf, 0); + + tc->failed = 1; + tc->message = string->buffer; + if (tc->jumpBuf != 0) longjmp(*(tc->jumpBuf), 0); +} + +void CuFail_Line(CuTest* tc, const char* file, int line, const char* message2, const char* message) +{ + CuString string; + + CuStringInit(&string); + if (message2 != NULL) + { + CuStringAppend(&string, message2); + CuStringAppend(&string, ": "); + } + CuStringAppend(&string, message); + CuFailInternal(tc, file, line, &string); +} + +void CuAssert_Line(CuTest* tc, const char* file, int line, const char* message, int condition) +{ + if (condition) return; + CuFail_Line(tc, file, line, NULL, message); +} + +void CuAssertStrEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, + const char* expected, const char* actual) +{ + CuString string; + if ((expected == NULL && actual == NULL) || + (expected != NULL && actual != NULL && + strcmp(expected, actual) == 0)) + { + return; + } + + CuStringInit(&string); + if (message != NULL) + { + CuStringAppend(&string, message); + CuStringAppend(&string, ": "); + } + CuStringAppend(&string, "expected <"); + CuStringAppend(&string, expected); + CuStringAppend(&string, "> but was <"); + CuStringAppend(&string, actual); + CuStringAppend(&string, ">"); + CuFailInternal(tc, file, line, &string); +} + +void CuAssertIntEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, + int expected, int actual) +{ + char buf[STRING_MAX]; + if (expected == actual) return; + sprintf(buf, "expected <%d> but was <%d>", expected, actual); + CuFail_Line(tc, file, line, message, buf); +} + +void CuAssertDblEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, + double expected, double actual, double delta) +{ + char buf[STRING_MAX]; + if (fabs(expected - actual) <= delta) return; + sprintf(buf, "expected <%lf> but was <%lf>", expected, actual); + CuFail_Line(tc, file, line, message, buf); +} + +void CuAssertPtrEquals_LineMsg(CuTest* tc, const char* file, int line, const char* message, + void* expected, void* actual) +{ + char buf[STRING_MAX]; + if (expected == actual) return; + sprintf(buf, "expected pointer <0x%p> but was <0x%p>", expected, actual); + CuFail_Line(tc, file, line, message, buf); +} + + +/*-------------------------------------------------------------------------* + * CuSuite + *-------------------------------------------------------------------------*/ + +void CuSuiteInit(CuSuite* testSuite) +{ + testSuite->count = 0; + testSuite->failCount = 0; +} + +CuSuite* CuSuiteNew(void) +{ + CuSuite* testSuite = CU_ALLOC(CuSuite); + CuSuiteInit(testSuite); + return testSuite; +} + +void CuSuiteAdd(CuSuite* testSuite, CuTest *testCase) +{ + assert(testSuite->count < MAX_TEST_CASES); + testSuite->list[testSuite->count] = testCase; + testSuite->count++; +} + +void CuSuiteAddSuite(CuSuite* testSuite, CuSuite* testSuite2) +{ + int i; + for (i = 0 ; i < testSuite2->count ; ++i) + { + CuTest* testCase = testSuite2->list[i]; + CuSuiteAdd(testSuite, testCase); + } +} + +void CuSuiteRun(CuSuite* testSuite) +{ + int i; + for (i = 0 ; i < testSuite->count ; ++i) + { + CuTest* testCase = testSuite->list[i]; + CuTestRun(testCase); + if (testCase->failed) { testSuite->failCount += 1; } + } +} + +void CuSuiteSummary(CuSuite* testSuite, CuString* summary) +{ + int i; + for (i = 0 ; i < testSuite->count ; ++i) + { + CuTest* testCase = testSuite->list[i]; + CuStringAppend(summary, testCase->failed ? "F" : "."); + } + CuStringAppend(summary, "\n\n"); +} + +void CuSuiteDetails(CuSuite* testSuite, CuString* details) +{ + int i; + int failCount = 0; + + if (testSuite->failCount == 0) + { + int passCount = testSuite->count - testSuite->failCount; + const char* testWord = passCount == 1 ? "test" : "tests"; + CuStringAppendFormat(details, "OK (%d %s)\n", passCount, testWord); + } + else + { + if (testSuite->failCount == 1) + CuStringAppend(details, "There was 1 failure:\n"); + else + CuStringAppendFormat(details, "There were %d failures:\n", testSuite->failCount); + + for (i = 0 ; i < testSuite->count ; ++i) + { + CuTest* testCase = testSuite->list[i]; + if (testCase->failed) + { + failCount++; + CuStringAppendFormat(details, "%d) %s: %s\n", + failCount, testCase->name, testCase->message); + } + } + CuStringAppend(details, "\n!!!FAILURES!!!\n"); + + CuStringAppendFormat(details, "Runs: %d ", testSuite->count); + CuStringAppendFormat(details, "Passes: %d ", testSuite->count - testSuite->failCount); + CuStringAppendFormat(details, "Fails: %d\n", testSuite->failCount); + } +} diff --git a/util/lwt.c b/util/lwt.c new file mode 100644 index 0000000..d7b9862 --- /dev/null +++ b/util/lwt.c @@ -0,0 +1,111 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * lightweight tracing + */ +#include + +#include "common.h" +#include "tls.h" +#include "lwt.h" +#include "mem.h" + +DECLARE_THREAD_LOCAL(tb_, int); + +lwt_buffer_t *lwt_buf_[MAX_NUM_THREADS] = {}; +uint64_t flag_mask_ = 0; +static int buf_count_ = 0; +static const char *flags_ = ""; + +void lwt_init (void) +{ + INIT_THREAD_LOCAL(tb_, NULL); +} + +void lwt_thread_init (int thread_id) +{ + assert(thread_id < MAX_NUM_THREADS); + if (lwt_buf_[thread_id] == NULL) { + lwt_buf_[thread_id] = (lwt_buffer_t *)nbd_malloc(sizeof(lwt_buffer_t) + sizeof(lwt_record_t) * LWT_BUFFER_SIZE); + SYNC_ADD(&buf_count_, 1); + memset(lwt_buf_[thread_id], 0, sizeof(lwt_buffer_t)); + SET_THREAD_LOCAL(tb_, lwt_buf_[thread_id]); + } +} + +void lwt_set_trace_level (const char *flags) +{ + assert(strlen(flags) % 2 == 0); // a well formed should be an even number of characters long + flags_ = flags; + int i; + for (i = 0; flags[i]; i+=2) { + flag_mask_ |= 1 << (flags[i] - 'A'); + } +} + +static inline void dump_record (FILE *file, int thread_id, lwt_record_t *r, uint64_t offset) +{ + // print the record if its trace category is enabled at a high enough level + int flag = (size_t)r->format >> 56; + int level = ((size_t)r->format >> 48) & 0xFF; + const char *f = strchr(flags_, flag); + if (f != NULL && level <= f[1]) { + char s[3] = {flag, level, '\0'}; + fprintf(file, "%09llu %d %s ", ((uint64_t)r->timestamp - offset) >> 6, thread_id, s); + const char *format = (const char *)(((uint64_t)r->format << 16) >> 16); // strip out the embedded flags + fprintf(file, format, r->value1, r->value2); + fprintf(file, "\n"); + } +} + +static void dump_buffer (FILE *file, int thread_id, uint64_t offset) +{ + assert(thread_id < buf_count_); + + lwt_buffer_t *tb = lwt_buf_[thread_id]; + int i; + if (tb->head > LWT_BUFFER_SIZE) { + for (i = tb->head & LWT_BUFFER_MASK; i < LWT_BUFFER_SIZE; ++i) { + dump_record(file, thread_id, tb->x + i, offset); + } + } + + for (i = 0; i < (tb->head & LWT_BUFFER_MASK); ++i) { + dump_record(file, thread_id, tb->x + i, offset); + } +} + +void lwt_dump (const char *file_name) +{ + uint64_t offset = (uint64_t)-1; + int i; + + for (i = 0; i < buf_count_; ++i) { + if (lwt_buf_[i] != NULL && lwt_buf_[i]->head != 0) { + uint64_t x = lwt_buf_[i]->x[0].timestamp; + if (x < offset) { + offset = x; + } + if (lwt_buf_[i]->head > LWT_BUFFER_SIZE) + { + x = lwt_buf_[i]->x[lwt_buf_[i]->head & LWT_BUFFER_MASK].timestamp; + if (x < offset) { + offset = x; + } + } + } + } + + if (offset != (uint64_t)-1) { + FILE *file = fopen(file_name, "w"); + assert(file); + for (i = 0; i < buf_count_; ++i) { + if (lwt_buf_[i] != NULL) { + dump_buffer(file, i, offset); + } + } + fflush(file); + fclose(file); + } +} diff --git a/util/mem.c b/util/mem.c new file mode 100644 index 0000000..0967c4a --- /dev/null +++ b/util/mem.c @@ -0,0 +1,151 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * Extreamly fast multi-threaded malloc. 64 bit platforms only! + */ +#include +#include +#include +#include "common.h" +#include "lwt.h" +#include "tls.h" + +#define GET_SCALE(n) (sizeof(n)*8-__builtin_clzl((n)-1)) // log2 of , rounded up +#define MAX_SCALE 31 // allocate blocks up to 4GB in size (arbitrary, could be bigger) +#define REGION_SCALE 22 // 4MB regions +#define REGION_SIZE (1 << REGION_SCALE) +#define HEADER_REGION_SCALE 22 // 4MB is space enough for headers for over 2,000,000 regions + +typedef struct block { + struct block *next; +} block_t; + +// region header +typedef struct header { + char owner; // thread id of owner + char scale; // log2 of the block size +} header_t; + +static header_t *region_header_ = NULL; + +// TODO: experiment with different memory layouts (i.e. separate private and public lists) +static block_t free_list_[MAX_NUM_THREADS][MAX_SCALE+1][MAX_NUM_THREADS]; + +static void *get_new_region (int scale) { + if (scale < REGION_SCALE) { + scale = REGION_SCALE; + } + TRACE("m0", "get_new_region(): mmap new region scale: %llu", scale, 0); + void *region = mmap(NULL, (1 << scale), PROT_READ|PROT_WRITE, MAP_ANON|MAP_PRIVATE, -1, 0); + if (region == (void *)-1) { + perror("get_new_region: mmap"); + exit(-1); + } + assert(region); + return region; +} + +void mem_init (void) { + assert(region_header_ == NULL); + region_header_ = (header_t *)get_new_region(HEADER_REGION_SCALE); + memset(region_header_, 0, REGION_SIZE); +} + +// Put onto its owner's public free list (in the appropriate size bin). +// +// TODO: maybe we want to munmap() larger size blocks to reclaim virtual address space? +void nbd_free (void *x) { + LOCALIZE_THREAD_LOCAL(tid_, int); + block_t *b = (block_t *)x; + assert(((size_t)b >> REGION_SCALE) < ((1 << HEADER_REGION_SCALE) / sizeof(header_t))); + header_t *h = region_header_ + ((size_t)b >> REGION_SCALE); + TRACE("m0", "nbd_free(): block %p scale %llu", x, h->scale); + block_t *l = &free_list_[(int)h->owner][(int)h->scale][tid_]; + TRACE("m0", "nbd_free(): free list %p first block %p", l, l->next); + b->next = l->next; + l->next = b; +} + +// Allocate a block of memory at least size . Blocks are binned in powers-of-two. Round up +// to the nearest power-of-two. +// +// First check the current thread's private free list for an available block. If no blocks are on +// the private free list, pull all the available blocks off of the current thread's public free +// lists and put them on the private free list. If we didn't find any blocks on the public free +// lists, open a new region, break it up into blocks and put them on the private free list. +void *nbd_malloc (size_t n) { + LOCALIZE_THREAD_LOCAL(tid_, int); + if (n < sizeof(block_t)) { + n = sizeof(block_t); + } + int b_scale = GET_SCALE(n); + assert(b_scale <= MAX_SCALE); + TRACE("m0", "nbd_malloc(): size %llu scale %llu", n, b_scale); + block_t *fls = free_list_[tid_][b_scale]; // our free lists + block_t *pri = fls + tid_; // our private free list + TRACE("m0", "nbd_malloc(): private free list %p first block %p", pri, pri->next); + + // If our private free list is empty, fill it up with blocks from our public free lists + if (EXPECT_FALSE(pri->next == NULL)) { + int cnt = 0; + block_t *last = pri; + for (int i = 0; i < MAX_NUM_THREADS; ++i) { + TRACE("m0", "nbd_malloc(): searching public free lists (%llu)", i, 0); + block_t *pub = fls + i; // one of our public free lists + TRACE("m0", "nbd_malloc(): public free list %p first block %p", pub, pub->next); + if (EXPECT_FALSE(pub == pri)) + continue; + + if (pub->next != NULL) { + block_t *stolen = SYNC_SWAP(&pub->next, NULL); + TRACE("m0", "nbd_malloc(): stole list %p first block %p", pub, stolen); + if (stolen) { + last->next = stolen; + TRACE("m0", "nbd_malloc(): append to last block %p of private free list", last, 0); + while (last->next) { + ++cnt; + TRACE("m0", "nbd_malloc(): find last block in list: last %p last->next %p", + last, last->next); + last = last->next; + } + } + } + } + TRACE("m0", "nbd_malloc(): moved %llu blocks from public to private free lists", cnt, 0); + + if (b_scale >= REGION_SCALE) { + if (cnt == 0) { + assert(pri->next == NULL); + pri->next = (block_t *)get_new_region(b_scale); + assert(pri->next->next == NULL); + } + assert(pri->next); + + } else if (cnt < (1 << (REGION_SCALE - b_scale - 1))) { + + // Even if we took a few blocks from our public lists we still break open a new region. + // This guarentees that we are amortizing the cost of accessing our public lists accross + // many nbd_malloc() calls. + char *region = get_new_region(b_scale); + size_t b_size = 1 << b_scale; + for (int i = REGION_SIZE; i != 0; i -= b_size) { + block_t *b = (block_t *)(region + i - b_size); + b->next = pri->next; + //TRACE("m1", "nbd_malloc(): put new block %p ahead of %p on private list", b, b->next); + pri->next = b; + *b = *b; + } + } + + assert(pri->next); + } + + // Pull a block off of our private free list. + block_t *b = pri->next; + TRACE("m0", "nbd_malloc(): take block %p off of of private list (new head is %p)", b, pri->next); + pri->next = b->next; + + assert(b); + return b; +} diff --git a/util/nbd.c b/util/nbd.c new file mode 100644 index 0000000..ed93e8f --- /dev/null +++ b/util/nbd.c @@ -0,0 +1,24 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + */ +#include "common.h" +#include "rcu.h" +#include "lwt.h" +#include "mem.h" +#include "nbd.h" +#include "tls.h" + +DECLARE_THREAD_LOCAL(tid_, int); + +void nbd_init (void) { + INIT_THREAD_LOCAL(tid_, NULL); + mem_init(); + lwt_init(); +} + +void nbd_thread_init (int id) { + SET_THREAD_LOCAL(tid_, id); + lwt_thread_init(id); + rcu_thread_init(id); +} diff --git a/util/rcu.c b/util/rcu.c new file mode 100644 index 0000000..dfa49c8 --- /dev/null +++ b/util/rcu.c @@ -0,0 +1,211 @@ +/* + * Written by Josh Dybnis and released to the public domain, as explained at + * http://creativecommons.org/licenses/publicdomain + * + * safe memory reclemation using a simple technique from rcu + */ +#include +#include "common.h" +#include "rcu.h" +#include "lwt.h" +#include "mem.h" +#include "nbd.h" +#include "tls.h" + +#define RCU_POST_THRESHOLD 10 +#define RCU_QUEUE_SCALE 20 + +typedef struct fifo { + uint32_t head; + uint32_t tail; + uint32_t scale; + void *x[0]; +} fifo_t; + +static uint64_t rcu_[MAX_NUM_THREADS][MAX_NUM_THREADS] = {}; +static uint64_t rcu_last_posted_[MAX_NUM_THREADS][MAX_NUM_THREADS] = {}; +static fifo_t *pending_[MAX_NUM_THREADS] = {}; +static int num_threads_ = 0; + +static fifo_t *fifo_alloc(int scale) { + fifo_t *q = (fifo_t *)nbd_malloc(sizeof(fifo_t) + (1 << scale) * sizeof(void *)); + memset(q, 0, sizeof(fifo_t)); + q->scale = scale; + q->head = 0; + q->tail = 0; + return q; +} + +static uint32_t fifo_index (fifo_t *q, uint32_t i) { + return i & MASK(q->scale); +} + +static void fifo_enqueue (fifo_t *q, void *x) { + assert(fifo_index(q, q->head + 1) != fifo_index(q, q->tail)); + uint32_t i = fifo_index(q, q->head++); + q->x[i] = x; +} + +static void *fifo_dequeue (fifo_t *q) { + uint32_t i = fifo_index(q, q->tail++); + return q->x[i]; +} + +void rcu_thread_init (int id) { + assert(id < MAX_NUM_THREADS); + if (pending_[id] == NULL) { + pending_[id] = fifo_alloc(RCU_QUEUE_SCALE); + SYNC_ADD(&num_threads_, 1); + } +} + +static void rcu_post (uint64_t x) { + LOCALIZE_THREAD_LOCAL(tid_, int); + if (x - rcu_last_posted_[tid_][tid_] < RCU_POST_THRESHOLD) + return; + + int next_thread_id = (tid_ + 1) % num_threads_; + + TRACE("r0", "rcu_post: %llu", x, 0); + rcu_[next_thread_id][tid_] = rcu_last_posted_[tid_][tid_] = x; +} + +void rcu_update (void) { + LOCALIZE_THREAD_LOCAL(tid_, int); + assert(tid_ < num_threads_); + int next_thread_id = (tid_ + 1) % num_threads_; + int i; + for (i = 0; i < num_threads_; ++i) { + if (i == tid_) + continue; + + // No need to post an update if the value hasn't changed + if (rcu_[tid_][i] == rcu_last_posted_[tid_][i]) + continue; + + uint64_t x = rcu_[tid_][i]; + rcu_[next_thread_id][i] = rcu_last_posted_[tid_][i] = x; + } + + // free + while (pending_[tid_]->tail != rcu_[tid_][tid_]) { + nbd_free(fifo_dequeue(pending_[tid_])); + } +} + +void nbd_defer_free (void *x) { + LOCALIZE_THREAD_LOCAL(tid_, int); + fifo_enqueue(pending_[tid_], x); + TRACE("r0", "nbd_defer_free: put %p on queue at position %llu", x, pending_[tid_]->head); + rcu_post(pending_[tid_]->head); +} + +#ifdef MAKE_rcu_test +#include +#include +#include + +#define NUM_ITERATIONS 10000000 + +typedef struct node { + struct node *next; +} node_t; + +typedef struct lifo { + node_t *head; +} lifo_t; + +static volatile int wait_; +static lifo_t *stk_; + +static lifo_t *lifo_alloc (void) { + lifo_t *stk = (lifo_t *)nbd_malloc(sizeof(lifo_t)); + memset(stk, 0, sizeof(lifo_t)); + return stk; +} + +static void lifo_aba_push (lifo_t *stk, node_t *x) { + node_t *head; + do { + head = ((volatile lifo_t *)stk)->head; + ((volatile node_t *)x)->next = head; + } while (__sync_val_compare_and_swap(&stk->head, head, x) != head); +} + +node_t *lifo_aba_pop (lifo_t *stk) { + node_t *head; + do { + head = ((volatile lifo_t *)stk)->head; + if (head == NULL) + return NULL; + } while (__sync_val_compare_and_swap(&stk->head, head, head->next) != head); + head->next = NULL; + return head; +} + +node_t *node_alloc (void) { + node_t *node = (node_t *)nbd_malloc(sizeof(node_t)); + memset(node, 0, sizeof(node_t)); + return node; +} + +void *worker (void *arg) { + int id = (int)(size_t)arg; + unsigned int rand_seed = (unsigned int)id + 1; + nbd_thread_init(id); + + // Wait for all the worker threads to be ready. + __sync_fetch_and_add(&wait_, -1); + do {} while (wait_); + + int i; + for (i = 0; i < NUM_ITERATIONS; ++ i) { + int n = rand_r(&rand_seed); + if (n & 0x1) { + lifo_aba_push(stk_, node_alloc()); + } else { + node_t *x = lifo_aba_pop(stk_); + if (x) { + nbd_defer_free(x); + } + } + rcu_update(); + } + + return NULL; +} + +int main (int argc, char **argv) { + nbd_init(); + //lwt_set_trace_level("m0r0"); + + int num_threads = 2; + if (argc == 2) + { + errno = 0; + num_threads = strtol(argv[1], NULL, 10); + if (errno) { + fprintf(stderr, "%s: Invalid argument for number of threads\n", argv[0]); + return -1; + } + if (num_threads <= 0) { + fprintf(stderr, "%s: Number of threads must be at least 1\n", argv[0]); + return -1; + } + } + + stk_ = lifo_alloc(); + wait_ = num_threads; + + pthread_t thread[num_threads]; + for (int i = 0; i < num_threads; ++i) { + int rc = pthread_create(thread + i, NULL, worker, (void *)(size_t)i); + if (rc != 0) { perror("pthread_create"); return rc; } + } + for (int i = 0; i < num_threads; ++i) { + pthread_join(thread[i], NULL); + } + + return 0; +} +#endif//rcu_test -- 2.40.0