commit 19a5846774e09848d49d87a348e55802a6ff4b2d Author: William Casarin Date: Wed Mar 19 15:12:52 2025 -0700 relay index wip Signed-off-by: William Casarin diff --git a/.gitignore b/.gitignore index 1dcbb43cde37..d2a3b3eb2442 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ tags +build.log +*.swp data.mdb lock.mdb v0-lock diff --git a/TODO b/TODO index 2225760540fc..667545f3a3a2 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,3 @@ -subscription polling -execution plan for created_at query -note kind index rebuild migration -(A) filter from json -tags index migration +add +relay even if we have note already +write +relay notekind index +write note_id -> +relay index diff --git a/src/nostrdb.c b/src/nostrdb.c index 9fdb1698c461..8377a37beb5b 100644 --- a/src/nostrdb.c +++ b/src/nostrdb.c @@ -129,6 +129,7 @@ struct ndb_ingest_controller { MDB_txn *read_txn; struct ndb_lmdb *lmdb; + struct ndb_note *note; }; enum ndb_writer_msgtype { @@ -136,9 +137,10 @@ enum ndb_writer_msgtype { NDB_WRITER_NOTE, // write a note to the db NDB_WRITER_PROFILE, // write a profile to the db NDB_WRITER_DBMETA, // write ndb metadata - NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched + NDB_WRITER_PROFILE_LAST_FETCH, // when profiles were last fetched; NDB_WRITER_BLOCKS, // write parsed note blocks NDB_WRITER_MIGRATE, // migrate the database + NDB_WRITER_NOTE_RELAY, // we already have the note, but we have more relays to write }; // keys used for storing data in the NDB metadata database (NDB_DB_NDB_META) @@ -1475,6 +1477,7 @@ static int ndb_db_is_index(enum ndb_dbs index) case NDB_DB_NDB_META: case NDB_DB_PROFILE_SEARCH: case NDB_DB_PROFILE_LAST_FETCH: + case NDB_DB_NOTE_RELAYS: case NDB_DBS: return 0; case NDB_DB_PROFILE_PK: @@ -1484,6 +1487,7 @@ static int ndb_db_is_index(enum ndb_dbs index) case NDB_DB_NOTE_TAGS: case NDB_DB_NOTE_PUBKEY: case NDB_DB_NOTE_PUBKEY_KIND: + case NDB_DB_NOTE_RELAY_KIND: return 1; } @@ -1499,6 +1503,14 @@ static inline void ndb_id_u64_ts_init(struct ndb_id_u64_ts *key, key->timestamp = timestamp; } +static int ndb_write_note_relay_kind_index(struct ndb_txn *txn, + struct ndb_note *note, + uint64_t note_key) +{ + assert(!"todo! ndb_write_note_relay_kind_index"); + return 0; +} + static int ndb_write_note_pubkey_index(struct ndb_txn *txn, struct ndb_note *note, uint64_t note_key) { @@ -1601,6 +1613,7 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, case NDB_DB_NDB_META: case NDB_DB_PROFILE_SEARCH: case NDB_DB_PROFILE_LAST_FETCH: + case NDB_DB_NOTE_RELAYS: case NDB_DBS: // this should never happen since we check at // the start @@ -1620,6 +1633,9 @@ static int ndb_rebuild_note_indices(struct ndb_txn *txn, enum ndb_dbs *indices, goto cleanup; } break; + case NDB_DB_NOTE_RELAY_KIND: + fprintf(stderr, "it doesn't make sense to rebuild note relay kind index\n"); + return 0; case NDB_DB_NOTE_PUBKEY_KIND: if (!ndb_write_note_pubkey_kind_index(txn, note, note_key)) { count = -1; @@ -1817,14 +1833,23 @@ enum ndb_ingester_msgtype { }; struct ndb_ingester_event { + const char *relay; char *json; unsigned client : 1; // ["EVENT", {...}] messages unsigned len : 31; }; +struct ndb_writer_note_relay { + const char *relay; + uint64_t note_key; + uint64_t kind; + uint64_t created_at; +} + struct ndb_writer_note { struct ndb_note *note; size_t note_len; + const char *relay; }; struct ndb_writer_profile { @@ -1862,6 +1887,7 @@ struct ndb_writer_blocks { struct ndb_writer_msg { enum ndb_writer_msgtype type; union { + struct ndb_writer_note_relay note_relay; struct ndb_writer_note note; struct ndb_writer_profile profile; struct ndb_writer_ndb_meta ndb_meta; @@ -2075,6 +2101,7 @@ static int ndb_cursor_start(MDB_cursor *cur, MDB_val *k, MDB_val *v) return 1; } + // get some value based on a clustered id key int ndb_get_tsid(struct ndb_txn *txn, enum ndb_dbs db, const unsigned char *id, MDB_val *val) @@ -2237,13 +2264,15 @@ static enum ndb_idres ndb_ingester_json_controller(void *data, const char *hexid unsigned char id[32]; struct ndb_ingest_controller *c = data; struct ndb_txn txn; + uint64_t key; hex_decode(hexid, 64, id, sizeof(id)); // let's see if we already have it - ndb_txn_from_mdb(&txn, c->lmdb, c->read_txn); - if (!ndb_has_note(&txn, id)) + c->note = ndb_get_note_by_id(&txn, id, NULL, &c->note_key); + + if (c->note == NULL) return NDB_IDRES_CONT; return NDB_IDRES_STOP; @@ -2330,7 +2359,8 @@ int ndb_process_profile_note(struct ndb_note *note, } static int ndb_ingester_queue_event(struct ndb_ingester *ingester, - char *json, unsigned len, unsigned client) + char *json, unsigned len, + unsigned client, const char *relay) { struct ndb_ingester_msg msg; msg.type = NDB_INGEST_EVENT; @@ -2338,14 +2368,22 @@ static int ndb_ingester_queue_event(struct ndb_ingester *ingester, msg.event.json = json; msg.event.len = len; msg.event.client = client; + msg.event.relay = relay; return threadpool_dispatch(&ingester->tp, &msg); } +void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay) +{ + meta->client = client; + meta->relay = relay; +} static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, - int len, unsigned client) + int len, struct ndb_ingest_meta *meta) { + const char *relay = meta->relay; + // Without this, we get bus errors in the json parser inside when // trying to ingest empty kind 6 reposts... we should probably do fuzz // testing on inputs to the json parser @@ -2362,7 +2400,13 @@ static int ndb_ingest_event(struct ndb_ingester *ingester, const char *json, if (json_copy == NULL) return 0; - return ndb_ingester_queue_event(ingester, json_copy, len, client); + if (relay != NULL) { + relay = strdup(meta->relay); + if (relay == NULL) + return 0; + } + + return ndb_ingester_queue_event(ingester, json_copy, len, meta->client, relay); } @@ -2370,9 +2414,12 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, struct ndb_note *note, size_t note_size, struct ndb_writer_msg *out, - struct ndb_ingester *ingester) + struct ndb_ingester *ingester, + const char *relay) { enum ndb_ingest_filter_action action; + struct ndb_ingest_meta meta; + action = NDB_INGEST_ACCEPT; if (ingester->filter) @@ -2410,26 +2457,83 @@ static int ndb_ingester_process_note(secp256k1_context *ctx, out->profile.note.note_len = note_size; return 1; } else if (note->kind == 6) { + const char *cloned_relay = NULL; // process the repost if we have a repost event ndb_debug("processing kind 6 repost\n"); + // dup the relay string + if (relay != NULL) + cloned_relay = strdup(relay); + ndb_ingest_meta_init(&meta, 0, cloned_relay); ndb_ingest_event(ingester, ndb_note_content(note), - ndb_note_content_length(note), 0); + ndb_note_content_length(note), + &meta); } out->type = NDB_WRITER_NOTE; out->note.note = note; out->note.note_len = note_size; + out->note.relay = relay; return 1; } +int ndb_note_has_relay(struct ndb_txn *txn, uint64_t note_key, const char *relay) +{ + int relay_len, rc, success = 0; + MDB_val k, v; + MDB_cursor *cur; + MDB_cursor_op op = MDB_FIRST_DUP; + + k.mv_data = ¬e_key; + k.mv_size = sizeof(note_key); + + if ((rc = mdb_cursor_open(txn->mdb_txn, txn->lmdb->dbs[NDB_DB_NOTE_RELAYS], &cur))) { + ndb_debug("ndb_note_has_relay: failed to open cursor: '%s'\n", mdb_strerror(rc)); + return 0; + } + + while (!mdb_cursor_get(cur, &k, &v, op)) { + if (!strcmp(relay, (char*)v.mv_data)) { + success = 1; + goto cleanup; + } + + op = MDB_NEXT_DUP; + } + +cleanup: + mdb_cursor_close(cur); + return success; +} + +// process the relay for the note. this is called when we already have the +// note in the database but still need to check if the relay needs to be +// written to the relay indexes for corresponding note +static int ndb_process_note_relay(struct ndb_txn *txn, struct ndb_writer_msg *out, + uint64_t note_key, struct ndb_note *note, + const char *relay) +{ + // query to see if we already have the relay on this note + if (ndb_note_has_relay(txn, note_key, relay)) { + return 0; + } + + // if not, tell the writer thread to emit a NOTE_RELAY event + out->type = NDB_WRITER_NOTE_RELAY; + + out->note_relay.relay = relay; + out->note_relay.note_key = controller.note_key; + out->note_relay.kind = ndb_note_kind(controller.note); + out->note_relay.created_at = ndb_note_created_at(controller.note); + + return 1; +} static int ndb_ingester_process_event(secp256k1_context *ctx, struct ndb_ingester *ingester, struct ndb_ingester_event *ev, struct ndb_writer_msg *out, - MDB_txn *read_txn - ) + MDB_txn *read_txn) { struct ndb_tce tce; struct ndb_fce fce; @@ -2463,10 +2567,25 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, ndb_client_event_from_json(ev->json, ev->len, &fce, buf, bufsize, &cb) : ndb_ws_event_from_json(ev->json, ev->len, &tce, buf, bufsize, &cb); + // This is a result from our special json parser. It parsed the id + // and found that we already have it in the database if ((int)note_size == -42) { - // we already have this! - //ndb_debug("already have id??\n"); - goto cleanup; + assert(controller.note != NULL); + assert(controller.note_key != 0); + + // we still need to process the relays on the note even + // if we already have it + if (ev->relay && ndb_process_note_relay(out, controller.note_key, + controller.note, relay)) + { + // free note buf here since we don't pass the note to the writer thread + free(buf); + goto success; + } else { + // we already have the note and there are no new + // relays to process. nothing to write. + goto cleanup; + } } else if (note_size == 0) { ndb_debug("failed to parse '%.*s'\n", ev->len, ev->json); goto cleanup; @@ -2484,13 +2603,12 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, } if (!ndb_ingester_process_note(ctx, note, note_size, - out, ingester)) { + out, ingester, + ev->relay)) { ndb_debug("failed to process note\n"); goto cleanup; } else { - // we're done with the original json, free it - free(ev->json); - return 1; + goto success; } } } else { @@ -2507,20 +2625,26 @@ static int ndb_ingester_process_event(secp256k1_context *ctx, } if (!ndb_ingester_process_note(ctx, note, note_size, - out, ingester)) { + out, ingester, + ev->relay)) { ndb_debug("failed to process note\n"); goto cleanup; } else { - // we're done with the original json, free it - free(ev->json); - return 1; + goto success; } } } +success: + free(ev->json); + // we don't free relay or buf since those are passed to the writer thread + return 1; + cleanup: free(ev->json); + if (ev->relay) + free(ev->relay); free(buf); return ok; @@ -2628,6 +2752,56 @@ retry: return 1; } +// +// The relay kind index has a layout like so (so we don't need dupsort) +// +// - note_id: 00 + 8 bytes +// - kind: 08 + 8 bytes +// - created_at: 16 + 8 bytes +// - relay_url_size: 24 + 1 byte +// - relay_url: 25 + n byte null-terminated string +// - pad to 8 byte alignment +// +// The key sort order is: +// +// relay_url, kind, created_at +// +static int ndb_relay_kind_cmp(const MDB_val *a, const MDB_val *b) +{ + int cmp; + uint64_t iva, ivb; + MDB_val va, vb; + + va.mv_size = *((unsigned char *)a->mv_data + 24); + va.mv_data = (unsigned char *)a->mv_data + 25; + + vb.mv_size = *((unsigned char *)b->mv_data + 24); + vb.mv_data = (unsigned char *)b->mv_data + 25; + + cmp = mdb_cmp_memn(&va, &vb); + if (cmp) return cmp; + + // kind + iva = *(uint64_t*)((unsigned char*)a->mv_data + 8); + ivb = *(uint64_t*)((unsigned char*)b->mv_data + 8); + + if (iva < ivb) + return -1; + else if (iva > ivb) + return 1; + + // created_at + iva = *(uint64_t*)((unsigned char*)a->mv_data + 16); + ivb = *(uint64_t*)((unsigned char*)b->mv_data + 16); + + if (iva < ivb) + return -1; + else if (iva > ivb) + return 1; + + return 0; +} + static int ndb_search_key_cmp(const MDB_val *a, const MDB_val *b) { int cmp; @@ -4590,6 +4764,7 @@ static void *ndb_writer_thread(void *data) case NDB_WRITER_PROFILE_LAST_FETCH: needs_commit = 1; break; case NDB_WRITER_BLOCKS: needs_commit = 1; break; case NDB_WRITER_MIGRATE: needs_commit = 1; break; + case NDB_WRITER_NOTE_RELAY: needs_commit = 1; break; case NDB_WRITER_QUIT: break; } } @@ -4643,6 +4818,9 @@ static void *ndb_writer_thread(void *data) }; } break; + case NDB_WRITER_NOTE_RELAY: + ndb_write_note_relay(&txn, msg->note_relay); + break; case NDB_WRITER_DBMETA: ndb_write_version(&txn, msg->ndb_meta.version); break; @@ -4928,6 +5106,22 @@ static int ndb_init_lmdb(const char *filename, struct ndb_lmdb *lmdb, size_t map return 0; } + // relay kind index. maps primary keys to relay records + // see ndb_relay_kind_cmp function for more details on the key format + if ((rc = mdb_dbi_open(txn, "relay_kind", MDB_CREATE, &lmdb->dbs[NDB_DB_NOTE_RELAY_KIND]))) { + fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); + return 0; + } + mdb_set_compare(txn, lmdb->dbs[NDB_DB_NOTE_RELAY_KIND], ndb_relay_kind_cmp); + + // note_id -> relay index + if ((rc = mdb_dbi_open(txn, "note_relays", MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED, &lmdb->dbs[NDB_DB_NOTE_RELAYS]))) { + fprintf(stderr, "mdb_dbi_open profile last fetch, error %d\n", rc); + return 0; + } + + // relay kind index. maps primary keys to relay records + // id+ts index flags unsigned int tsid_flags = MDB_CREATE | MDB_DUPSORT | MDB_DUPFIXED; @@ -5109,6 +5303,7 @@ void ndb_destroy(struct ndb *ndb) free(ndb); } + // Process a nostr event from a client // // ie: ["EVENT", {"content":"..."} ...] @@ -5116,7 +5311,10 @@ void ndb_destroy(struct ndb *ndb) // The client-sent variation of ndb_process_event int ndb_process_client_event(struct ndb *ndb, const char *json, int len) { - return ndb_ingest_event(&ndb->ingester, json, len, 1); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 1, NULL); + + return ndb_ingest_event(&ndb->ingester, json, len, &meta); } // Process anostr event from a relay, @@ -5138,25 +5336,32 @@ int ndb_process_client_event(struct ndb *ndb, const char *json, int len) // int ndb_process_event(struct ndb *ndb, const char *json, int json_len) { - return ndb_ingest_event(&ndb->ingester, json, json_len, 0); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 0, NULL); + + return ndb_ingest_event(&ndb->ingester, json, json_len, &meta); } +int ndb_process_event_with(struct ndb *ndb, const char *json, int json_len, + struct ndb_ingest_meta *meta) +{ + return ndb_ingest_event(&ndb->ingester, json, json_len, meta); +} -int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, int client) +int _ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len, + struct ndb_ingest_meta *meta) { const char *start, *end, *very_end; start = ldjson; end = start + json_len; very_end = ldjson + json_len; - int (* process)(struct ndb *, const char *, int); #if DEBUG int processed = 0; #endif - process = client ? ndb_process_client_event : ndb_process_event; while ((end = fast_strchr(start, '\n', very_end - start))) { //printf("processing '%.*s'\n", (int)(end-start), start); - if (!process(ndb, start, end - start)) { + if (!ndb_process_event_with(ndb, start, end - start, meta)) { ndb_debug("ndb_process_client_event failed\n"); return 0; } @@ -5194,14 +5399,26 @@ int ndb_process_events_stream(struct ndb *ndb, FILE* fp) } #endif +int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len, + struct ndb_ingest_meta *meta) +{ + return _ndb_process_events(ndb, ldjson, json_len, meta); +} + int ndb_process_client_events(struct ndb *ndb, const char *ldjson, size_t json_len) { - return _ndb_process_events(ndb, ldjson, json_len, 1); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 1, NULL); + + return _ndb_process_events(ndb, ldjson, json_len, &meta); } int ndb_process_events(struct ndb *ndb, const char *ldjson, size_t json_len) { - return _ndb_process_events(ndb, ldjson, json_len, 0); + struct ndb_ingest_meta meta; + ndb_ingest_meta_init(&meta, 0, NULL); + + return _ndb_process_events(ndb, ldjson, json_len, &meta); } static inline int cursor_push_tag(struct cursor *cur, struct ndb_tag *tag) @@ -7086,6 +7303,10 @@ const char *ndb_db_name(enum ndb_dbs db) return "note_pubkey_index"; case NDB_DB_NOTE_PUBKEY_KIND: return "note_pubkey_kind_index"; + case NDB_DB_NOTE_RELAY_KIND: + return "note_relay_kind_index"; + case NDB_DB_NOTE_RELAYS: + return "note_relays"; case NDB_DBS: return "count"; } diff --git a/src/nostrdb.h b/src/nostrdb.h index f4efa91b876f..ab51e1a2e45f 100644 --- a/src/nostrdb.h +++ b/src/nostrdb.h @@ -55,6 +55,11 @@ struct ndb_str { }; }; +struct ndb_ingest_meta { + unsigned client; + const char *relay; +}; + struct ndb_keypair { unsigned char pubkey[32]; unsigned char secret[32]; @@ -189,6 +194,8 @@ enum ndb_dbs { NDB_DB_NOTE_TAGS, // note tags index NDB_DB_NOTE_PUBKEY, // note pubkey index NDB_DB_NOTE_PUBKEY_KIND, // note pubkey kind index + NDB_DB_NOTE_RELAY_KIND, // relay+kind+created -> note_id + NDB_DB_NOTE_RELAYS, // note_id -> relays NDB_DBS, }; @@ -470,14 +477,23 @@ int ndb_note_verify(void *secp_ctx, unsigned char pubkey[32], unsigned char id[3 // NDB int ndb_init(struct ndb **ndb, const char *dbdir, const struct ndb_config *); int ndb_db_version(struct ndb_txn *txn); + +// NOTE PROCESSING int ndb_process_event(struct ndb *, const char *json, int len); +void ndb_ingest_meta_init(struct ndb_ingest_meta *meta, unsigned client, const char *relay); +// Process an event, recording the relay where it came from. +int ndb_process_event_with(struct ndb *, const char *json, int len, struct ndb_ingest_meta *meta); int ndb_process_events(struct ndb *, const char *ldjson, size_t len); +int ndb_process_events_with(struct ndb *ndb, const char *ldjson, size_t json_len, struct ndb_ingest_meta *meta); #ifndef _WIN32 // TODO: fix on windows int ndb_process_events_stream(struct ndb *, FILE* fp); #endif +// deprecated: use ndb_ingest_event_with int ndb_process_client_event(struct ndb *, const char *json, int len); +// deprecated: use ndb_ingest_events_with int ndb_process_client_events(struct ndb *, const char *json, size_t len); + int ndb_begin_query(struct ndb *, struct ndb_txn *); int ndb_search_profile(struct ndb_txn *txn, struct ndb_search *search, const char *query); int ndb_search_profile_next(struct ndb_search *search);