diff --git a/Cargo.lock b/Cargo.lock index 6abb6ea6e0af..5a1d8b943bf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2296,7 +2296,7 @@ dependencies = [ [[package]] name = "nostrdb" version = "0.3.4" -source = "git+https://github.com/damus-io/nostrdb-rs?rev=8be0d4972148cc1387ddcaa40b97a924519ba855#8be0d4972148cc1387ddcaa40b97a924519ba855" +source = "git+https://github.com/damus-io/nostrdb-rs?rev=be8969a1c2563f24cecaf6ec2c2b253c9697a577#be8969a1c2563f24cecaf6ec2c2b253c9697a577" dependencies = [ "bindgen", "cc", diff --git a/Cargo.toml b/Cargo.toml index 6847fadf9e95..1921faf51bda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence tracing = "0.1.40" #wasm-bindgen = "0.2.83" -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "8be0d4972148cc1387ddcaa40b97a924519ba855" } +nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "be8969a1c2563f24cecaf6ec2c2b253c9697a577" } #nostrdb = { path = "/Users/jb55/dev/github/damus-io/nostrdb-rs" } #nostrdb = "0.3.4" enostr = { path = "enostr" } diff --git a/enostr/Cargo.toml b/enostr/Cargo.toml index a528e643b6e2..8020366bdcb2 100644 --- a/enostr/Cargo.toml +++ b/enostr/Cargo.toml @@ -11,7 +11,7 @@ serde_derive = "1" serde = { version = "1", features = ["derive"] } # You only need this if you want app persistence serde_json = "1.0.89" nostr = { version = "0.30.0" } -nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "8be0d4972148cc1387ddcaa40b97a924519ba855" } +nostrdb = { git = "https://github.com/damus-io/nostrdb-rs", rev = "be8969a1c2563f24cecaf6ec2c2b253c9697a577" } hex = "0.4.3" tracing = "0.1.40" env_logger = "0.11.1" diff --git a/src/app.rs b/src/app.rs index e7d212d31385..82769d7ddb5d 100644 --- a/src/app.rs +++ b/src/app.rs @@ -97,9 +97,21 @@ fn relay_setup(pool: &mut RelayPool, ctx: &egui::Context) { } } -fn send_initial_timeline_filter(damus: &Damus, timeline: usize) { - match damus.timelines[timeline].filter { +fn send_initial_timeline_filter(damus: &mut Damus, timeline: usize) { + let can_since_optimize = damus.since_optimize; + let mut update_filter_state: Option = None; + + match &damus.timelines[timeline].filter { + FilterState::FetchingRemote(ref unisub) => { + error!("FetchingRemote state when sending initial timeline filter?"); + } + + FilterState::GotRemote(ref sub) => { + error!("GotRemote state when sending initial timeline filter?"); + } + FilterState::Ready(filter) => { + let filter = filter.to_owned(); let new_filters = filter.into_iter().map(|f| { // limit the size of remote filters let default_limit = filter::default_remote_limit(); @@ -139,27 +151,29 @@ fn send_initial_timeline_filter(damus: &Damus, timeline: usize) { FilterState::NeedsRemote(filter) => { let sub_id = Uuid::new_v4().to_string(); let uid = damus.timelines[timeline].uid; - let local_sub = damus.ndb.subscribe(filter).expect("sub"); + let local_sub = damus.ndb.subscribe(filter.clone()).expect("sub"); - damus.timelines[timeline].filter = - FilterState::fetching_remote(sub_id.clone(), local_sub); + update_filter_state = Some(FilterState::fetching_remote(sub_id.clone(), local_sub)); damus .subscriptions() .insert(sub_id.clone(), SubKind::FetchingContactList(uid)); - damus.pool.subscribe(sub_id, filter); + damus.pool.subscribe(sub_id, filter.to_owned()); } } + + if let Some(new_filter_state) = update_filter_state { + damus.timelines[timeline].filter = new_filter_state; + } } fn send_initial_filters(damus: &mut Damus, relay_url: &str) { info!("Sending initial filters to {}", relay_url); let mut c: u32 = 1; - let can_since_optimize = damus.since_optimize; let timelines = damus.timelines.len(); for i in 0..timelines { - send_timeline_filter(i); + send_initial_timeline_filter(damus, i); } } @@ -381,7 +395,7 @@ fn setup_profiling() { puffin::set_scopes_on(true); // tell puffin to collect data } -fn setup_initial_timeline(damus: &mut Damus, timeline: usize, filters: Vec) { +fn setup_initial_timeline(damus: &mut Damus, timeline: usize, filters: Vec) -> Result<()> { damus.timelines[timeline].subscription = Some(damus.ndb.subscribe(filters.clone())?); let txn = Transaction::new(&damus.ndb)?; debug!( @@ -414,15 +428,25 @@ fn setup_initial_timeline(damus: &mut Damus, timeline: usize, filters: Vec Result<()> { +fn setup_initial_nostrdb_subs(damus: &mut Damus) -> Result<()> { let timelines = damus.timelines.len(); for i in 0..timelines { match &damus.timelines[i].filter { - FilterState::Ready(filters) => setup_initial_timeline(damus, i, filters.to_owned()), - - FilterState::NeedsRemote(filters) => {} + FilterState::Ready(filters) => setup_initial_timeline(damus, i, filters.to_owned())?, + FilterState::FetchingRemote(_) => { + error!("FetchingRemote state in setup_initial_nostr_subs") + } + FilterState::GotRemote(_) => { + error!("GotRemote state in setup_initial_nostr_subs") + } + FilterState::NeedsRemote(_filters) => { + // can't do anything yet, we defer to first connect to send + // remote filters + } } } @@ -520,10 +544,10 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { sub_kind } else { warn!("got unknown eose subid {}", subid); - return; + return Ok(()); }; - match sub_kind { + match *sub_kind { SubKind::Initial => { let txn = Transaction::new(&damus.ndb)?; let ids = get_unknown_ids(&txn, damus)?; @@ -532,7 +556,8 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { let sub_id = Uuid::new_v4().to_string(); let msg = ClientMessage::req(sub_id.clone(), filters); - damus.subscriptions().insert(sub_id, SubKind::UnknownIds) + // unknownids are a oneshot request + damus.subscriptions().insert(sub_id, SubKind::OneShot); damus.pool.send_to(&msg, relay_url); } } @@ -544,21 +569,29 @@ fn handle_eose(damus: &mut Damus, subid: &str, relay_url: &str) -> Result<()> { } SubKind::FetchingContactList(timeline_uid) => { - let timeline_ind = if let Some(i) = damus.find_timeline(timeline_ind) { + let timeline_ind = if let Some(i) = damus.find_timeline(timeline_uid) { i } else { - error!("timeline uid:{} not found for FetchingContactList", timeline_uid); - return; + error!( + "timeline uid:{} not found for FetchingContactList", + timeline_uid + ); + return Ok(()); }; - let local_sub = if let FilterState::FetchingRemote(unisub) = damus.timelines[timeline_ind].filter { - unisub.local + let local_sub = if let FilterState::FetchingRemote(unisub) = + &damus.timelines[timeline_ind].filter + { + unisub.local.id } else { - error!("Expected timeline to have FetchingRemote state but was {:?}", damus.timelines[timeline_ind].filter); - return; + error!( + "Expected timeline to have FetchingRemote state but was {:?}", + damus.timelines[timeline_ind].filter + ); + return Ok(()); }; - let local_subid = local_sub.id; + let local_subid = local_sub; damus.timelines[timeline_ind].filter = FilterState::got_remote(local_sub); // see if we're fast enough to catch a processed contact list @@ -685,15 +718,18 @@ impl Damus { .as_ref() .map(|a| a.pubkey.bytes()); let ndb = Ndb::new(&dbpath, &config).expect("ndb"); - let timelines = parsed_args - .columns - .into_iter() - .map(|c| c.into_timeline(&ndb, account)) - .collect(); + + let mut timelines: Vec = Vec::with_capacity(parsed_args.columns.len()); + for col in parsed_args.columns { + if let Some(timeline) = col.into_timeline(&ndb, account) { + timelines.push(timeline); + } + } Self { pool, is_mobile, + subscriptions: Subscriptions::default(), since_optimize: parsed_args.since_optimize, threads: Threads::default(), drafts: Drafts::default(), @@ -727,6 +763,7 @@ impl Damus { config.set_ingester_threads(2); Self { is_mobile, + subscriptions: Subscriptions::default(), since_optimize: true, threads: Threads::default(), drafts: Drafts::default(), @@ -748,7 +785,7 @@ impl Damus { pub fn find_timeline(&self, uid: u32) -> Option { let mut i = 0usize; - for timeline in self.timelines { + for timeline in &self.timelines { if timeline.uid == uid { return Some(i); } diff --git a/src/args.rs b/src/args.rs index 28543416496b..bdd39bf1181e 100644 --- a/src/args.rs +++ b/src/args.rs @@ -1,6 +1,6 @@ use crate::column::{ColumnKind, PubkeySource}; -use crate::timeline::Timeline; use crate::filter::FilterState; +use crate::timeline::Timeline; use enostr::{Filter, Keypair, Pubkey, SecretKey}; use nostrdb::Ndb; use tracing::{error, info}; @@ -177,11 +177,12 @@ pub enum ArgColumn { } impl ArgColumn { - pub fn into_timeline(self, ndb: &Ndb, user: Option<&[u8; 32]>) -> Timeline { + pub fn into_timeline(self, ndb: &Ndb, user: Option<&[u8; 32]>) -> Option { match self { - ArgColumn::Generic(filters) => { - Timeline::new(ColumnKind::Generic, FilterState::ready(filters)) - } + ArgColumn::Generic(filters) => Some(Timeline::new( + ColumnKind::Generic, + FilterState::ready(filters), + )), ArgColumn::Column(ck) => ck.into_timeline(ndb, user), } } diff --git a/src/column.rs b/src/column.rs index 48f80ef9a495..a9a3a3e62217 100644 --- a/src/column.rs +++ b/src/column.rs @@ -1,8 +1,9 @@ -use crate::{timeline::Timeline, Error}; use crate::filter::FilterState; +use crate::{timeline::Timeline, Error}; use enostr::Pubkey; use nostrdb::{Filter, Ndb, Transaction}; use std::fmt::Display; +use tracing::{error, warn}; #[derive(Clone, Debug)] pub enum PubkeySource { @@ -46,12 +47,16 @@ impl ColumnKind { ColumnKind::List(ListKind::Contact(pk)) } - pub fn into_timeline(self, ndb: &Ndb, default_user: Option<&[u8; 32]>) -> Timeline { + pub fn into_timeline(self, ndb: &Ndb, default_user: Option<&[u8; 32]>) -> Option { match self { - ColumnKind::Universe => Timeline::new(ColumnKind::Universe, FilterState::ready(vec![])), + ColumnKind::Universe => Some(Timeline::new( + ColumnKind::Universe, + FilterState::ready(vec![]), + )), ColumnKind::Generic => { - panic!("you can't convert a ColumnKind::Generic to a Timeline") + warn!("you can't convert a ColumnKind::Generic to a Timeline"); + None } ColumnKind::List(ListKind::Contact(ref pk_src)) => { @@ -60,33 +65,38 @@ impl ColumnKind { if let Some(user_pk) = default_user { user_pk } else { - // No user loaded, so we have to return an unloaded - // contact list columns - return Timeline::new( - ColumnKind::contact_list(PubkeySource::DeckAuthor), - FilterState::needs_remote(), - ); + // No user loaded, and pk source needs a user, + // we can't do much here, so just return no timeline + return None; } } PubkeySource::Explicit(pk) => pk.bytes(), }; let contact_filter = Filter::new().authors([pk]).kinds([3]).limit(1).build(); + let txn = Transaction::new(ndb).expect("txn"); let results = ndb - .query(&txn, vec![contact_filter], 1) + .query(&txn, vec![contact_filter.clone()], 1) .expect("contact query failed?"); if results.is_empty() { - return Timeline::new(ColumnKind::contact_list(pk_src.to_owned()), FilterState::needs_remote()); + return Some(Timeline::new( + ColumnKind::contact_list(pk_src.to_owned()), + FilterState::needs_remote(vec![contact_filter.clone()]), + )); } match Timeline::contact_list(&results[0].note) { - Err(Error::EmptyContactList) => { - Timeline::new(ColumnKind::contact_list(pk_src.to_owned()), FilterState::needs_remote()) + Err(Error::EmptyContactList) => Some(Timeline::new( + ColumnKind::contact_list(pk_src.to_owned()), + FilterState::needs_remote(vec![contact_filter]), + )), + Err(e) => { + error!("Unexpected error: {e}"); + None } - Err(e) => panic!("Unexpected error: {e}"), - Ok(tl) => tl, + Ok(tl) => Some(tl), } } } diff --git a/src/filter.rs b/src/filter.rs index 85c7f5f3dca4..e494e711aff3 100644 --- a/src/filter.rs +++ b/src/filter.rs @@ -7,8 +7,8 @@ use tracing::{debug, warn}; /// tracks data received remotely, and local #[derive(Debug)] pub struct UnifiedSubscription { - local: Subscription, - remote: String, + pub local: Subscription, + pub remote: String, } /// We may need to fetch some data from relays before our filter is ready. @@ -17,7 +17,7 @@ pub struct UnifiedSubscription { pub enum FilterState { NeedsRemote(Vec), FetchingRemote(UnifiedSubscription), - GotRemote(), + GotRemote(u64), Ready(Vec), } @@ -37,8 +37,8 @@ impl FilterState { /// We got the remote data. Local data should be available to build /// the filter for the [`FilterState::Ready`] state - pub fn got_remote(local_sub: Subscription) -> Self { - Self::GotRemote(local_sub) + pub fn got_remote(local_sub_id: u64) -> Self { + Self::GotRemote(local_sub_id) } /// We have sent off a remote subscription to get data needed for the diff --git a/src/subscriptions.rs b/src/subscriptions.rs index 0fa7ffe2a646..df71cc0dc025 100644 --- a/src/subscriptions.rs +++ b/src/subscriptions.rs @@ -17,6 +17,7 @@ pub enum SubKind { /// Subscriptions that need to be tracked at various stages. Sometimes we /// need to do A, then B, then C. Tracking requests at various stages by /// mapping uuid subids to explicit states happens here. +#[derive(Default)] pub struct Subscriptions { pub subs: HashMap, }