Last active
December 21, 2025 20:04
-
-
Save cabecada/d22224010648ca94459a317fbd0836b7 to your computer and use it in GitHub Desktop.
test sharding using murmur3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| postgres@ubuntu:/mnt/VHD/consistent$ cat pg_consistent.c | |
| #include "postgres.h" | |
| #include "fmgr.h" | |
| #include "utils/builtins.h" | |
| PG_MODULE_MAGIC; | |
| // --- MurmurHash3 Engine --- | |
| static uint32_t murmur3_32(const char *key, int len, uint32_t seed) { | |
| uint32_t h1 = seed; | |
| const uint32_t c1 = 0xcc9e2d51, c2 = 0x1b873593; | |
| const int nblocks = len / 4; | |
| const uint32_t *blocks = (const uint32_t *)key; | |
| for (int i = 0; i < nblocks; i++) { | |
| uint32_t k1 = blocks[i]; | |
| k1 *= c1; k1 = (k1 << 15) | (k1 >> 17); k1 *= c2; | |
| h1 ^= k1; h1 = (h1 << 13) | (h1 >> 19); h1 = h1 * 5 + 0xe6546b64; | |
| } | |
| const uint8_t *tail = (const uint8_t *)(key + nblocks * 4); | |
| uint32_t k1 = 0; | |
| switch (len & 3) { | |
| case 3: k1 ^= tail[2] << 16; | |
| case 2: k1 ^= tail[1] << 8; | |
| case 1: k1 ^= tail[0]; k1 *= c1; k1 = (k1 << 15) | (k1 >> 17); k1 *= c2; h1 ^= k1; | |
| } | |
| h1 ^= len; h1 ^= h1 >> 16; h1 *= 0x85ebca6b; h1 ^= h1 >> 13; h1 *= 0xc2b2ae35; h1 ^= h1 >> 16; | |
| return h1; | |
| } | |
| // --- Jump Consistent Hash --- | |
| static int32_t jump_consistent_hash(uint64_t key, int32_t num_buckets) { | |
| int64_t b = -1, j = 0; | |
| while (j < num_buckets) { | |
| b = j; | |
| key = key * 2862933555777941757ULL + 1; | |
| j = (int64_t)((b + 1) * ((double)(1LL << 31) / (double)((key >> 33) + 1))); | |
| } | |
| return (int32_t)b; | |
| } | |
| PG_FUNCTION_INFO_V1(get_multi_shard); | |
| Datum get_multi_shard(PG_FUNCTION_ARGS) { | |
| int32 num_shards = PG_GETARG_INT32(3); | |
| char combined_key[1024] = {0}; | |
| int offset = 0; | |
| for (int i = 0; i < 3; i++) { | |
| if (!PG_ARGISNULL(i)) { | |
| text *arg = PG_GETARG_TEXT_PP(i); | |
| char *str = text_to_cstring(arg); | |
| int len = strlen(str); | |
| if (offset + len + 1 < 1024) { | |
| memcpy(combined_key + offset, str, len); | |
| offset += len; | |
| combined_key[offset++] = '|'; // Separator | |
| } | |
| } | |
| } | |
| uint32_t h = murmur3_32(combined_key, offset, 0); | |
| PG_RETURN_INT32(jump_consistent_hash((uint64_t)h, num_shards)); | |
| } | |
| // 1. Function to get the shard for a key | |
| PG_FUNCTION_INFO_V1(get_jump_shard); | |
| Datum get_jump_shard(PG_FUNCTION_ARGS) { | |
| text *key_txt = PG_GETARG_TEXT_PP(0); | |
| int32 num_shards = PG_GETARG_INT32(1); | |
| char *key = text_to_cstring(key_txt); | |
| uint32_t h = murmur3_32(key, strlen(key), 0); | |
| PG_RETURN_INT32(jump_consistent_hash((uint64_t)h, num_shards)); | |
| } | |
| // 2. Function to check movement on node count change | |
| PG_FUNCTION_INFO_V1(check_shard_movement); | |
| Datum check_shard_movement(PG_FUNCTION_ARGS) { | |
| int32 num_keys = PG_GETARG_INT32(0); | |
| int32 old_shards = PG_GETARG_INT32(1); | |
| int32 new_shards = PG_GETARG_INT32(2); | |
| int32 moved = 0; | |
| for (int i = 0; i < num_keys; i++) { | |
| // Use loop index as a pseudo-key | |
| uint32_t h = murmur3_32((char*)&i, sizeof(int), 0); | |
| if (jump_consistent_hash(h, old_shards) != jump_consistent_hash(h, new_shards)) { | |
| moved++; | |
| } | |
| } | |
| PG_RETURN_INT32(moved); | |
| } | |
| // 3. Simple hash for ID-based sharding | |
| PG_FUNCTION_INFO_V1(get_jump_shard_int); | |
| Datum get_jump_shard_int(PG_FUNCTION_ARGS) { | |
| int64 key = PG_GETARG_INT64(0); | |
| int32 num_shards = PG_GETARG_INT32(1); | |
| PG_RETURN_INT32(jump_consistent_hash((uint64_t)key, num_shards)); | |
| } | |
| ---------------------------------------------------------------------- | |
| deploy | |
| # Compile to object file | |
| gcc -fPIC -c pg_consistent.c -I$(pg_config --includedir-server) | |
| # Link into a shared object (.so) | |
| gcc -shared -o pg_consistent.so pg_consistent.o | |
| #PostgreSQL needs the .so file in its library path. | |
| sudo cp pg_consistent.so $(pg_config --pkglibdir) | |
| ---------------------------------------------------------------------- | |
| deploy the functions | |
| psql << | |
| -- 1. Get shard for string keys (e.g., usernames, emails) | |
| CREATE OR REPLACE FUNCTION get_jump_shard(text, int) | |
| RETURNS int AS 'pg_consistent', 'get_jump_shard' | |
| LANGUAGE C IMMUTABLE STRICT; | |
| -- 2. Get shard for integer keys (e.g., user_id) | |
| CREATE OR REPLACE FUNCTION get_jump_shard_int(bigint, int) | |
| RETURNS int AS 'pg_consistent', 'get_jump_shard_int' | |
| LANGUAGE C IMMUTABLE STRICT; | |
| -- 3. Simulate movement (sample_size, old_count, new_count) | |
| CREATE OR REPLACE FUNCTION check_shard_movement(int, int, int) | |
| RETURNS int AS 'pg_consistent', 'check_shard_movement' | |
| LANGUAGE C IMMUTABLE STRICT; | |
| -- 4. Register the function with default NULLs so you can call it with 1, 2, or 3 keys easily | |
| CREATE OR REPLACE FUNCTION get_multi_shard( | |
| key1 text DEFAULT NULL, | |
| key2 text DEFAULT NULL, | |
| key3 text DEFAULT NULL, | |
| num_shards int DEFAULT 1 | |
| ) | |
| RETURNS int AS 'pg_consistent', 'get_multi_shard' | |
| LANGUAGE C IMMUTABLE; | |
| --- test | |
| CREATE TABLE trade_test ( | |
| trade_id BIGSERIAL PRIMARY KEY, | |
| account_id TEXT NOT NULL, | |
| symbol TEXT NOT NULL, | |
| trade_date DATE NOT NULL, | |
| quantity NUMERIC(18, 4), | |
| price NUMERIC(18, 4), | |
| -- We add a virtual column to see the shard mapping for 16 nodes | |
| shard_idx INT | |
| ); | |
| INSERT INTO trade_test (account_id, symbol, trade_date, quantity, price) | |
| SELECT | |
| 'ACC_' || (lpile.i % 1000)::text, -- 1,000 unique accounts | |
| (ARRAY['GME', 'CASH', 'TSLA', 'SPY'])[floor(random() * 4 + 1)], -- Random ticker | |
| '2025-01-01'::date + (random() * 350)::int, -- Dates across 2025 | |
| (random() * 100)::numeric, | |
| (random() * 500)::numeric | |
| FROM generate_series(1, 100000) AS lpile(i); | |
| UPDATE trade_test SET shard_idx = get_multi_shard(account_id, NULL, NULL, 16); | |
| -- Check distribution: | |
| SELECT shard_idx, count(*) | |
| FROM trade_test GROUP BY 1 ORDER BY 1; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment