diff --git a/graph/examples/append_row.rs b/graph/examples/append_row.rs index f272c07cf82..811484a5c02 100644 --- a/graph/examples/append_row.rs +++ b/graph/examples/append_row.rs @@ -104,7 +104,7 @@ pub fn main() -> anyhow::Result<()> { }; mods.push(md); } - let mut group = RowGroup::new(THING_TYPE.clone(), false); + let mut group = RowGroup::new(THING_TYPE.clone()); let start = Instant::now(); for md in mods { diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 8c9671cb347..0f8ba70251e 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -4,6 +4,8 @@ use std::{ sync::Arc, }; +use slog::Logger; + use crate::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, cheap_clone::CheapClone, @@ -338,24 +340,22 @@ impl LastMod { #[derive(Debug, CacheWeight)] pub struct RowGroup { pub entity_type: EntityType, + /// All changes for this entity type, ordered by block; i.e., if `i < j` /// then `rows[i].block() <= rows[j].block()`. Several methods on this /// struct rely on the fact that this ordering is observed. rows: Vec, - immutable: bool, - /// Map the `key.entity_id` of all entries in `rows` to the index with /// the most recent entry for that id to speed up lookups. last_mod: LastMod, } impl RowGroup { - pub fn new(entity_type: EntityType, immutable: bool) -> Self { + pub fn new(entity_type: EntityType) -> Self { Self { entity_type, rows: Vec::new(), - immutable, last_mod: LastMod::new(), } } @@ -471,7 +471,7 @@ impl RowGroup { /// Append `row` to `self.rows` by combining it with a previously /// existing row, if that is possible fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> { - if self.immutable { + if self.entity_type.is_immutable() { match row { EntityModification::Insert { .. } => { // Check if this is an attempt to overwrite an immutable @@ -488,6 +488,9 @@ impl RowGroup { .and_then(|&idx| self.rows.get(idx)) { Some(prev) if prev.block() != row.block() => { + if self.entity_type.skip_duplicates() { + return Ok(()); + } return Err(StoreError::Input( format!("entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}", row.key(), row.block(), prev.block()))); @@ -498,6 +501,9 @@ impl RowGroup { self.push_row(row); } EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => { + if self.entity_type.skip_duplicates() { + return Ok(()); + } return Err(internal_error!( "immutable entity type {} only allows inserts, not {:?}", self.entity_type, @@ -604,8 +610,8 @@ impl RowGroup { pub struct RowGroupForPerfTest(RowGroup); impl RowGroupForPerfTest { - pub fn new(entity_type: EntityType, immutable: bool) -> Self { - Self(RowGroup::new(entity_type, immutable)) + pub fn new(entity_type: EntityType) -> Self { + Self(RowGroup::new(entity_type)) } pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> { @@ -661,11 +667,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> { #[derive(Debug, CacheWeight)] pub struct RowGroups { pub groups: Vec, + logger: Logger, } impl RowGroups { - fn new() -> Self { - Self { groups: Vec::new() } + fn new(logger: Logger) -> Self { + Self { + groups: Vec::new(), + logger, + } } fn group(&self, entity_type: &EntityType) -> Option<&RowGroup> { @@ -684,9 +694,7 @@ impl RowGroups { match pos { Some(pos) => &mut self.groups[pos], None => { - let immutable = entity_type.is_immutable(); - self.groups - .push(RowGroup::new(entity_type.clone(), immutable)); + self.groups.push(RowGroup::new(entity_type.clone())); // unwrap: we just pushed an entry self.groups.last_mut().unwrap() } @@ -784,6 +792,7 @@ impl Batch { deterministic_errors: Vec, offchain_to_remove: Vec, is_non_fatal_errors_active: bool, + logger: Logger, ) -> Result { let block = block_ptr.number; @@ -797,7 +806,7 @@ impl Batch { EntityModification::Remove { .. } => 0, }); - let mut mods = RowGroups::new(); + let mut mods = RowGroups::new(logger); for m in raw_mods { mods.group_entry(&m.key().entity_type).push(m, block)?; @@ -1079,7 +1088,6 @@ mod test { let group = RowGroup { entity_type: ENTRY_TYPE.clone(), rows, - immutable: false, last_mod, }; let act = group @@ -1120,6 +1128,8 @@ mod test { type Thing @entity { id: ID!, count: Int! } type RowGroup @entity { id: ID! } type Entry @entity { id: ID! } + type ImmThing @entity(immutable: true) { id: ID!, count: Int! } + type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! } "#; lazy_static! { static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap(); @@ -1128,6 +1138,8 @@ mod test { static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap(); static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap(); static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap(); + static ref IMM_THING_TYPE: EntityType = SCHEMA.entity_type("ImmThing").unwrap(); + static ref SKIP_DUP_THING_TYPE: EntityType = SCHEMA.entity_type("SkipDupThing").unwrap(); } /// Convenient notation for changes to a fixed entity @@ -1187,7 +1199,7 @@ mod test { impl Group { fn new() -> Self { Self { - group: RowGroup::new(THING_TYPE.clone(), false), + group: RowGroup::new(THING_TYPE.clone()), } } @@ -1292,4 +1304,76 @@ mod test { let op = group.last_op(&key, 0); assert_eq!(None, op); } + + fn make_insert(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification { + EntityModification::Insert { + key: entity_type.parse_key(id).unwrap(), + data: Arc::new(entity! { SCHEMA => id: id, count: block }), + block, + end: None, + } + } + + fn make_overwrite( + entity_type: &EntityType, + id: &str, + block: BlockNumber, + ) -> EntityModification { + EntityModification::Overwrite { + key: entity_type.parse_key(id).unwrap(), + data: Arc::new(entity! { SCHEMA => id: id, count: block }), + block, + end: None, + } + } + + fn make_remove(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification { + EntityModification::Remove { + key: entity_type.parse_key(id).unwrap(), + block, + } + } + + #[test] + fn append_row_immutable_default_rejects_cross_block_duplicate() { + let mut group = RowGroup::new(IMM_THING_TYPE.clone()); + let res = group.push(make_insert(&IMM_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + let res = group.push(make_insert(&IMM_THING_TYPE, "one", 2), 2); + assert!(res.is_err()); + } + + #[test] + fn append_row_skip_duplicates_allows_cross_block_duplicate() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 2), 2); + assert!(res.is_ok()); + assert_eq!(group.row_count(), 1); + } + + #[test] + fn append_row_skip_duplicates_allows_overwrite() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone()); + let res = group.append_row(make_overwrite(&SKIP_DUP_THING_TYPE, "one", 1)); + assert!(res.is_ok()); + } + + #[test] + fn append_row_skip_duplicates_allows_remove() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone()); + let res = group.append_row(make_remove(&SKIP_DUP_THING_TYPE, "one", 1)); + assert!(res.is_ok()); + } + + #[test] + fn append_row_skip_duplicates_same_block_still_pushes() { + let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1); + assert!(res.is_ok()); + assert_eq!(group.row_count(), 2); + } } diff --git a/graph/src/schema/entity_type.rs b/graph/src/schema/entity_type.rs index deb3cb3d8ef..c46f68681c9 100644 --- a/graph/src/schema/entity_type.rs +++ b/graph/src/schema/entity_type.rs @@ -68,6 +68,10 @@ impl EntityType { self.schema.is_immutable(self.atom) } + pub fn skip_duplicates(&self) -> bool { + self.schema.skip_duplicates(self.atom) + } + pub fn id_type(&self) -> Result { self.schema.id_type(self.atom) } diff --git a/graph/src/schema/input/mod.rs b/graph/src/schema/input/mod.rs index 2ec52fe1762..ef241ff831f 100644 --- a/graph/src/schema/input/mod.rs +++ b/graph/src/schema/input/mod.rs @@ -42,6 +42,7 @@ pub mod kw { pub const ENTITY: &str = "entity"; pub const IMMUTABLE: &str = "immutable"; pub const TIMESERIES: &str = "timeseries"; + pub const SKIP_DUPLICATES: &str = "skipDuplicates"; pub const TIMESTAMP: &str = "timestamp"; pub const AGGREGATE: &str = "aggregate"; pub const AGGREGATION: &str = "aggregation"; @@ -125,12 +126,27 @@ impl TypeInfo { fn is_immutable(&self) -> bool { match self { - TypeInfo::Object(obj_type) => obj_type.immutable, + TypeInfo::Object(obj_type) => { + matches!(obj_type.mutability, ObjectMutability::Immutable { .. }) + } TypeInfo::Interface(_) => false, TypeInfo::Aggregation(_) => true, } } + fn skip_duplicates(&self) -> bool { + match self { + TypeInfo::Object(obj_type) => matches!( + obj_type.mutability, + ObjectMutability::Immutable { + skip_duplicates: true + } + ), + TypeInfo::Interface(_) => false, + TypeInfo::Aggregation(_) => false, + } + } + fn kind(&self) -> TypeKind { match self { TypeInfo::Object(_) => TypeKind::Object, @@ -409,7 +425,7 @@ pub struct ObjectType { pub name: Atom, pub id_type: IdType, pub fields: Box<[Field]>, - pub immutable: bool, + pub mutability: ObjectMutability, /// The name of the aggregation to which this object type belongs if it /// is part of an aggregation aggregation: Option, @@ -448,16 +464,28 @@ impl ObjectType { None => false, _ => unreachable!("validations ensure we don't get here"), }; - let immutable = match dir.argument("immutable") { + let immutable = match dir.argument(kw::IMMUTABLE) { Some(Value::Boolean(im)) => *im, None => timeseries, _ => unreachable!("validations ensure we don't get here"), }; + + let mutability = if immutable { + let skip_duplicates = match dir.argument(kw::SKIP_DUPLICATES) { + Some(Value::Boolean(sd)) => *sd, + _ => false, + }; + + ObjectMutability::Immutable { skip_duplicates } + } else { + ObjectMutability::Mutable + }; + Self { name, fields, id_type, - immutable, + mutability, aggregation: None, timeseries, interfaces, @@ -488,7 +516,7 @@ impl ObjectType { name, interfaces: Box::new([]), id_type: IdType::String, - immutable: false, + mutability: ObjectMutability::Mutable, aggregation: None, timeseries: false, fields, @@ -506,6 +534,12 @@ impl ObjectType { } } +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum ObjectMutability { + Mutable, + Immutable { skip_duplicates: bool }, +} + #[derive(PartialEq, Debug)] pub struct InterfaceType { pub name: Atom, @@ -886,7 +920,9 @@ impl Aggregation { .cloned() .chain(aggregates.iter().map(Aggregate::as_agg_field)) .collect(), - immutable: true, + mutability: ObjectMutability::Immutable { + skip_duplicates: false, + }, aggregation: Some(name), timeseries: false, interfaces: Box::new([]), @@ -1203,6 +1239,13 @@ impl InputSchema { .unwrap_or(false) } + pub(in crate::schema) fn skip_duplicates(&self, entity_type: Atom) -> bool { + self.type_info(entity_type) + .ok() + .map(|ti| ti.skip_duplicates()) + .unwrap_or(false) + } + /// Return true if `type_name` is the name of an object or interface type pub fn is_reference(&self, type_name: &str) -> bool { self.inner @@ -1344,7 +1387,7 @@ impl InputSchema { TypeInfo::Object(obj_type) => Some(obj_type), TypeInfo::Interface(_) | TypeInfo::Aggregation(_) => None, }) - .filter(|obj_type| obj_type.immutable) + .filter(|obj_type| matches!(obj_type.mutability, ObjectMutability::Immutable { .. })) .map(|obj_type| EntityType::new(self.cheap_clone(), obj_type.name)) } @@ -2050,6 +2093,15 @@ mod validations { Ok(b) => b.unwrap_or(timeseries), Err(e) => return Some(e), }; + let skip_duplicates = match bool_arg(dir, kw::SKIP_DUPLICATES) { + Ok(b) => b.unwrap_or(false), + Err(e) => return Some(e), + }; + if skip_duplicates && !immutable { + return Some(SchemaValidationError::SkipDuplicatesRequiresImmutable( + object_type.name.clone(), + )); + } if timeseries { if !immutable { Some(SchemaValidationError::MutableTimeseries( diff --git a/graph/src/schema/mod.rs b/graph/src/schema/mod.rs index 1e40299df63..52a440fc04f 100644 --- a/graph/src/schema/mod.rs +++ b/graph/src/schema/mod.rs @@ -148,6 +148,8 @@ pub enum SchemaValidationError { AggregationDerivedField(String, String), #[error("Timeseries {0} is marked as mutable, it must be immutable")] MutableTimeseries(String), + #[error("Entity type `{0}` has skipDuplicates: true but is not immutable; skipDuplicates requires immutable: true")] + SkipDuplicatesRequiresImmutable(String), #[error("Timeseries {0} is missing a `timestamp` field")] TimeseriesMissingTimestamp(String), #[error("Type {0} has a `timestamp` field of type {1}, but it must be of type Timestamp")] diff --git a/graph/src/schema/test_schemas/skip_duplicates_not_a_boolean.graphql b/graph/src/schema/test_schemas/skip_duplicates_not_a_boolean.graphql new file mode 100644 index 00000000000..ed22d3e8eb8 --- /dev/null +++ b/graph/src/schema/test_schemas/skip_duplicates_not_a_boolean.graphql @@ -0,0 +1,5 @@ +# fail: EntityDirectiveNonBooleanArgValue("skipDuplicates") + +type Data @entity(immutable: true, skipDuplicates: "yes") { + id: Bytes! +} diff --git a/graph/src/schema/test_schemas/skip_duplicates_requires_immutable.graphql b/graph/src/schema/test_schemas/skip_duplicates_requires_immutable.graphql new file mode 100644 index 00000000000..2eeaf6cff09 --- /dev/null +++ b/graph/src/schema/test_schemas/skip_duplicates_requires_immutable.graphql @@ -0,0 +1,5 @@ +# fail: SkipDuplicatesRequiresImmutable("Data") + +type Data @entity(skipDuplicates: true) { + id: Bytes! +} diff --git a/graph/src/schema/test_schemas/skip_duplicates_valid.graphql b/graph/src/schema/test_schemas/skip_duplicates_valid.graphql new file mode 100644 index 00000000000..cdf6ee29b23 --- /dev/null +++ b/graph/src/schema/test_schemas/skip_duplicates_valid.graphql @@ -0,0 +1,5 @@ +# valid: Minimal example + +type Data @entity(immutable: true, skipDuplicates: true) { + id: Bytes! +} diff --git a/graph/src/util/cache_weight.rs b/graph/src/util/cache_weight.rs index 077db9a51ce..b560906fdc0 100644 --- a/graph/src/util/cache_weight.rs +++ b/graph/src/util/cache_weight.rs @@ -188,6 +188,12 @@ impl CacheWeight for EntityType { } } +impl CacheWeight for slog::Logger { + fn indirect_weight(&self) -> usize { + 0 + } +} + impl CacheWeight for [u8; 32] { fn indirect_weight(&self) -> usize { 0 diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d361bbe9c56..3f82149eca7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -258,6 +258,15 @@ importers: specifier: 0.31.0 version: 0.31.0 + tests/runner-tests/skip-duplicates: + devDependencies: + '@graphprotocol/graph-cli': + specifier: 0.60.0 + version: 0.60.0(@types/node@24.3.0)(bufferutil@4.0.9)(encoding@0.1.13)(node-fetch@2.7.0(encoding@0.1.13))(typescript@5.9.2)(utf-8-validate@5.0.10) + '@graphprotocol/graph-ts': + specifier: 0.31.0 + version: 0.31.0 + tests/runner-tests/typename: devDependencies: '@graphprotocol/graph-cli': @@ -2678,11 +2687,6 @@ packages: engines: {node: '>=10'} hasBin: true - semver@7.6.3: - resolution: {integrity: sha512-oVekP1cKtI+CTDvHWYFUcMtsK/00wmAEfyqKfNdARm8u1wNVhSgaX7A8d4UuIlUI5e84iEwOhs7ZPYRmzU9U6A==} - engines: {node: '>=10'} - hasBin: true - semver@7.7.3: resolution: {integrity: sha512-SdsKMrI9TdgjdweUSR9MweHA4EJ8YxHn8DFaDisvhVlUOe4BF1tLD7GAj0lIqWVl+dPb/rExr0Btby5loQm20Q==} engines: {node: '>=10'} @@ -3739,7 +3743,7 @@ snapshots: chalk: 4.1.2 clean-stack: 3.0.1 cli-progress: 3.12.0 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) ejs: 3.1.10 get-package-type: 0.1.0 globby: 11.1.0 @@ -3812,7 +3816,7 @@ snapshots: chalk: 4.1.2 clean-stack: 3.0.1 cli-progress: 3.12.0 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) ejs: 3.1.10 fs-extra: 9.1.0 get-package-type: 0.1.0 @@ -3824,7 +3828,7 @@ snapshots: natural-orderby: 2.0.3 object-treeify: 1.1.33 password-prompt: 1.1.3 - semver: 7.6.3 + semver: 7.7.4 string-width: 4.2.3 strip-ansi: 6.0.1 supports-color: 8.1.1 @@ -3886,7 +3890,7 @@ snapshots: dependencies: '@oclif/core': 2.16.0(@types/node@24.3.0)(typescript@5.9.2) chalk: 4.1.2 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) transitivePeerDependencies: - '@swc/core' - '@swc/wasm' @@ -4583,11 +4587,9 @@ snapshots: optionalDependencies: supports-color: 8.1.1 - debug@4.4.1(supports-color@8.1.1): + debug@4.4.1: dependencies: ms: 2.1.3 - optionalDependencies: - supports-color: 8.1.1 debug@4.4.3(supports-color@8.1.1): dependencies: @@ -4664,7 +4666,7 @@ snapshots: dns-over-http-resolver@1.2.3(node-fetch@2.7.0(encoding@0.1.13)): dependencies: - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) native-fetch: 3.0.0(node-fetch@2.7.0(encoding@0.1.13)) receptacle: 1.3.2 transitivePeerDependencies: @@ -5272,7 +5274,7 @@ snapshots: any-signal: 2.1.2 blob-to-it: 1.0.4 browser-readablestream-to-it: 1.0.3 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.3(supports-color@8.1.1) err-code: 3.0.1 ipfs-core-types: 0.9.0(node-fetch@2.7.0(encoding@0.1.13)) ipfs-unixfs: 6.0.9 @@ -5301,7 +5303,7 @@ snapshots: '@ipld/dag-pb': 2.1.18 abort-controller: 3.0.0 any-signal: 2.1.2 - debug: 4.4.1(supports-color@8.1.1) + debug: 4.4.1 err-code: 3.0.1 ipfs-core-types: 0.9.0(node-fetch@2.7.0(encoding@0.1.13)) ipfs-core-utils: 0.13.0(encoding@0.1.13)(node-fetch@2.7.0(encoding@0.1.13)) @@ -6170,8 +6172,6 @@ snapshots: dependencies: lru-cache: 6.0.0 - semver@7.6.3: {} - semver@7.7.3: {} semver@7.7.4: {} diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index efdfe6a5d8d..f5f9ab9c6dd 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -44,7 +44,7 @@ use graph::schema::{ AggregationInterval, EntityKey, EntityType, Field, FulltextConfig, FulltextDefinition, InputSchema, }; -use graph::slog::warn; +use graph::slog::{debug, warn}; use index::IndexList; use inflector::Inflector; use itertools::Itertools; @@ -373,6 +373,7 @@ impl Layout { position: position as u32, is_account_like: false, immutable: false, + skip_duplicates: false, has_causality_region: false, } } @@ -769,44 +770,60 @@ impl Layout { // We insert the entities in chunks to make sure each operation does // not exceed the maximum number of bindings allowed in queries let chunk_size = InsertQuery::chunk_size(table); + let mut affected_rows: usize = 0; + let mut expected_rows: usize = 0; for chunk in group.write_chunks(chunk_size) { // Empty chunks would lead to invalid SQL if !chunk.is_empty() { - if let Err(e) = InsertQuery::new(table, &chunk)?.execute(conn).await { - // We occasionally get these errors but it's entirely - // unclear what causes them. We work around that by - // switching to row-by-row inserts until we can figure - // out what the underlying cause is - let err_msg = e.to_string(); - if !err_msg.contains("value too large to transmit") { - let (block, msg) = chunk_details(&chunk); - return Err(StoreError::write_failure( - e, - table.object.as_str(), - block, - msg, - )); + expected_rows += chunk.len(); + match InsertQuery::new(table, &chunk)?.execute(conn).await { + Ok(count) => { + affected_rows += count; } - let (block, msg) = chunk_details(&chunk); - warn!(logger, "Insert of entire chunk failed. Trying row by row insert."; - "table" => table.object.as_str(), - "block" => block, - "error" => err_msg, - "details" => msg - ); - for single_chunk in chunk.as_single_writes() { - InsertQuery::new(table, &single_chunk)? - .execute(conn) - .await - .map_err(|e| { - let (block, msg) = chunk_details(&single_chunk); - let msg = format!("{}: offending row {:?}", msg, single_chunk); - StoreError::write_failure(e, table.object.as_str(), block, msg) - })?; + Err(e) => { + // We occasionally get these errors but it's entirely + // unclear what causes them. We work around that by + // switching to row-by-row inserts until we can figure + // out what the underlying cause is + let err_msg = e.to_string(); + if !err_msg.contains("value too large to transmit") { + let (block, msg) = chunk_details(&chunk); + return Err(StoreError::write_failure( + e, + table.object.as_str(), + block, + msg, + )); + } + let (block, msg) = chunk_details(&chunk); + warn!(logger, "Insert of entire chunk failed. Trying row by row insert."; + "table" => table.object.as_str(), + "block" => block, + "error" => err_msg, + "details" => msg + ); + for single_chunk in chunk.as_single_writes() { + let count = InsertQuery::new(table, &single_chunk)? + .execute(conn) + .await + .map_err(|e| { + let (block, msg) = chunk_details(&single_chunk); + let msg = format!("{}: offending row {:?}", msg, single_chunk); + StoreError::write_failure(e, table.object.as_str(), block, msg) + })?; + affected_rows += count; + } } } } } + if affected_rows < expected_rows && table.immutable && table.skip_duplicates { + debug!(logger, "Cross-batch duplicate inserts skipped by ON CONFLICT DO NOTHING"; + "entity_type" => table.object.as_str(), + "expected_rows" => expected_rows, + "affected_rows" => affected_rows, + "skipped" => expected_rows - affected_rows); + } Ok(()) } @@ -949,6 +966,9 @@ impl Layout { ) -> Result { let table = self.table_for_entity(&group.entity_type)?; if table.immutable && group.has_clamps() { + if table.skip_duplicates { + return Ok(0); + } let ids = group .ids() .map(|id| id.to_string()) @@ -1014,6 +1034,9 @@ impl Layout { let table = self.table_for_entity(&group.entity_type)?; if table.immutable { + if table.skip_duplicates { + return Ok(0); + } return Err(internal_error!( "entities of type `{}` can not be deleted since they are immutable. Entity ids are [{}]", table.object, group.ids().join(", ") @@ -1662,6 +1685,8 @@ pub struct Table { /// deleted pub(crate) immutable: bool, + pub(crate) skip_duplicates: bool, + /// Whether this table has an explicit `causality_region` column. If `false`, then the column is /// not present and the causality region for all rows is implicitly `0` (equivalent to CasualityRegion::ONCHAIN). pub(crate) has_causality_region: bool, @@ -1692,6 +1717,7 @@ impl Table { .collect::, StoreError>>()?; let qualified_name = SqlName::qualified_name(&catalog.site.namespace, &table_name); let immutable = defn.is_immutable(); + let skip_duplicates = defn.skip_duplicates(); let nsp = catalog.site.namespace.clone(); let table = Table { object: defn.cheap_clone(), @@ -1705,6 +1731,7 @@ impl Table { columns, position, immutable, + skip_duplicates, has_causality_region, }; Ok(table) @@ -1722,6 +1749,7 @@ impl Table { is_account_like: self.is_account_like, position: self.position, immutable: self.immutable, + skip_duplicates: self.skip_duplicates, has_causality_region: self.has_causality_region, }; diff --git a/store/postgres/src/relational/query_tests.rs b/store/postgres/src/relational/query_tests.rs index 1b68ae5d0cc..63ccb5e2ea0 100644 --- a/store/postgres/src/relational/query_tests.rs +++ b/store/postgres/src/relational/query_tests.rs @@ -2,7 +2,9 @@ use std::{collections::BTreeSet, sync::Arc}; use diesel::{debug_query, pg::Pg}; use graph::{ + components::store::write::RowGroup, data_source::CausalityRegion, + entity, prelude::{r, serde_json as json, DeploymentHash, EntityFilter}, schema::InputSchema, }; @@ -11,7 +13,7 @@ use crate::{ block_range::BoundSide, layout_for_tests::{make_dummy_site, Namespace}, relational::{Catalog, ColumnType, Layout}, - relational_queries::{FindRangeQuery, FromColumnValue}, + relational_queries::{FindRangeQuery, FromColumnValue, InsertQuery}, }; use crate::relational_queries::Filter; @@ -197,3 +199,63 @@ fn test_id_type_casting(table: &crate::relational::Table, expected_cast: &str, t sql ); } + +fn insert_sql_for_schema(gql: &str, entity_type_name: &str) -> String { + use graph::components::store::write::EntityModification; + + let layout = test_layout(gql); + let schema = &layout.input_schema; + let et = schema.entity_type(entity_type_name).unwrap(); + let table = layout.table_for_entity(&et).unwrap(); + + let mut entity = entity! { schema => id: "test1" }; + entity.set_vid(1).unwrap(); + let key = et.key(graph::data::store::Id::String("test1".into())); + let emod = EntityModification::Insert { + key, + data: Arc::new(entity), + block: 1, + end: None, + }; + + let mut group = RowGroup::new(et); + group.push(emod, 1).unwrap(); + + let chunks: Vec<_> = group.write_chunks(100).collect(); + let chunk = &chunks[0]; + let query = InsertQuery::new(table.as_ref(), chunk).unwrap(); + debug_query::(&query).to_string() +} + +#[test] +fn skip_duplicates_insert_generates_on_conflict() { + let schema = " + type Thing @entity(immutable: true, skipDuplicates: true) { + id: String! + }"; + let sql = insert_sql_for_schema(schema, "Thing"); + assert!( + sql.contains("ON CONFLICT"), + "Expected ON CONFLICT in SQL for skip_duplicates immutable table, got: {}", + sql + ); + assert!( + sql.contains("DO NOTHING"), + "Expected DO NOTHING in SQL for skip_duplicates immutable table, got: {}", + sql + ); +} + +#[test] +fn default_immutable_insert_has_no_on_conflict_skip_duplicates() { + let schema = " + type Thing @entity(immutable: true) { + id: String! + }"; + let sql = insert_sql_for_schema(schema, "Thing"); + assert!( + !sql.contains("ON CONFLICT"), + "Default immutable table should NOT have ON CONFLICT, got: {}", + sql + ); +} diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 1c746f1338e..eee8b6073f1 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2584,6 +2584,12 @@ impl<'a> QueryFragment for InsertQuery<'a> { out.push_sql(")"); } + if self.table.immutable && self.table.skip_duplicates { + out.push_sql("\n ON CONFLICT ("); + out.push_identifier(self.table.primary_key().name.as_str())?; + out.push_sql(") DO NOTHING"); + } + Ok(()) } } diff --git a/store/postgres/src/writable.rs b/store/postgres/src/writable.rs index 958ba2def76..680a217c8be 100644 --- a/store/postgres/src/writable.rs +++ b/store/postgres/src/writable.rs @@ -1839,6 +1839,7 @@ impl WritableStoreTrait for WritableStore { deterministic_errors, processed_data_sources, is_non_fatal_errors_active, + self.store.logger.clone(), )?; self.writer.write(batch, stopwatch).await?; diff --git a/store/test-store/tests/postgres/relational.rs b/store/test-store/tests/postgres/relational.rs index 4a029709408..57cac35cc2b 100644 --- a/store/test-store/tests/postgres/relational.rs +++ b/store/test-store/tests/postgres/relational.rs @@ -128,6 +128,11 @@ const THINGS_GQL: &str = r#" order: Int, } + type SkipDupMink @entity(immutable: true, skipDuplicates: true) { + id: ID!, + order: Int, + } + type User @entity { id: ID!, name: String!, @@ -228,6 +233,7 @@ lazy_static! { static ref CAT_TYPE: EntityType = THINGS_SCHEMA.entity_type("Cat").unwrap(); static ref FERRET_TYPE: EntityType = THINGS_SCHEMA.entity_type("Ferret").unwrap(); static ref MINK_TYPE: EntityType = THINGS_SCHEMA.entity_type("Mink").unwrap(); + static ref SKIP_DUP_MINK_TYPE: EntityType = THINGS_SCHEMA.entity_type("SkipDupMink").unwrap(); static ref CHAIR_TYPE: EntityType = THINGS_SCHEMA.entity_type("Chair").unwrap(); static ref NULLABLE_STRINGS_TYPE: EntityType = THINGS_SCHEMA.entity_type("NullableStrings").unwrap(); @@ -996,7 +1002,7 @@ async fn conflicting_entity() { let fred = entity! { layout.input_schema => id: id.clone(), name: id.clone() }; let fred = Arc::new(fred); let types: Vec<_> = types.into_iter().cloned().collect(); - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new(entity_type.clone()); group .push( EntityModification::Insert { @@ -2126,3 +2132,59 @@ async fn check_filters() { }) .await; } + +fn store_layer_row_group_update( + entity_type: &EntityType, + block: BlockNumber, + data: impl IntoIterator, +) -> RowGroup { + let mut group = RowGroup::new(entity_type.clone()); + for (key, data) in data { + group + .push(EntityModification::overwrite(key, data, block), block) + .unwrap(); + } + group +} + +fn store_layer_row_group_delete( + entity_type: &EntityType, + block: BlockNumber, + data: impl IntoIterator, +) -> RowGroup { + let mut group = RowGroup::new(entity_type.clone()); + for key in data { + group + .push(EntityModification::remove(key, block), block) + .unwrap(); + } + group +} + +#[graph::test] +async fn skip_duplicates_update_returns_ok() { + run_test(async |conn, layout| { + let entity = entity! { layout.input_schema => + id: "sd1", + order: 1, + vid: 0i64 + }; + let key = SKIP_DUP_MINK_TYPE.key(entity.id()); + let entities = vec![(key, entity)]; + let group = store_layer_row_group_update(&SKIP_DUP_MINK_TYPE, 1, entities); + let result = layout.update(conn, &group, &MOCK_STOPWATCH).await; + assert_eq!(result.unwrap(), 0); + }) + .await; +} + +#[graph::test] +async fn skip_duplicates_delete_returns_ok() { + run_test(async |conn, layout| { + let key = SKIP_DUP_MINK_TYPE.parse_key("sd1").unwrap(); + let group = store_layer_row_group_delete(&SKIP_DUP_MINK_TYPE, 1, vec![key]); + let result = layout.delete(conn, &group, &MOCK_STOPWATCH).await; + assert_eq!(result.unwrap(), 0); + }) + .await; +} diff --git a/store/test-store/tests/postgres/relational_bytes.rs b/store/test-store/tests/postgres/relational_bytes.rs index 470d4e17412..2c4ac5f27c7 100644 --- a/store/test-store/tests/postgres/relational_bytes.rs +++ b/store/test-store/tests/postgres/relational_bytes.rs @@ -85,7 +85,7 @@ pub fn row_group_update( block: BlockNumber, data: impl IntoIterator, ) -> RowGroup { - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new(entity_type.clone()); for (key, data) in data { group .push(EntityModification::overwrite(key, data, block), block) @@ -99,7 +99,7 @@ pub fn row_group_insert( block: BlockNumber, data: impl IntoIterator, ) -> RowGroup { - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new(entity_type.clone()); for (key, data) in data { group .push(EntityModification::insert(key, data, block), block) @@ -113,7 +113,7 @@ pub fn row_group_delete( block: BlockNumber, data: impl IntoIterator, ) -> RowGroup { - let mut group = RowGroup::new(entity_type.clone(), false); + let mut group = RowGroup::new(entity_type.clone()); for key in data { group .push(EntityModification::remove(key, block), block) diff --git a/tests/runner-tests/skip-duplicates/abis/Contract.abi b/tests/runner-tests/skip-duplicates/abis/Contract.abi new file mode 100644 index 00000000000..9d9f56b9263 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/abis/Contract.abi @@ -0,0 +1,15 @@ +[ + { + "anonymous": false, + "inputs": [ + { + "indexed": false, + "internalType": "string", + "name": "testCommand", + "type": "string" + } + ], + "name": "TestEvent", + "type": "event" + } +] diff --git a/tests/runner-tests/skip-duplicates/package.json b/tests/runner-tests/skip-duplicates/package.json new file mode 100644 index 00000000000..da0ba3e3e87 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/package.json @@ -0,0 +1,13 @@ +{ + "name": "skip-duplicates", + "version": "0.0.0", + "private": true, + "scripts": { + "codegen": "graph codegen --skip-migrations", + "deploy:test": "graph deploy test/skip-duplicates --version-label v0.0.1 --ipfs $IPFS_URI --node $GRAPH_NODE_ADMIN_URI" + }, + "devDependencies": { + "@graphprotocol/graph-cli": "0.60.0", + "@graphprotocol/graph-ts": "0.31.0" + } +} diff --git a/tests/runner-tests/skip-duplicates/schema.graphql b/tests/runner-tests/skip-duplicates/schema.graphql new file mode 100644 index 00000000000..94c5a2862d0 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/schema.graphql @@ -0,0 +1,4 @@ +type Ping @entity(immutable: true, skipDuplicates: true) { + id: ID! + value: String! +} diff --git a/tests/runner-tests/skip-duplicates/src/mapping.ts b/tests/runner-tests/skip-duplicates/src/mapping.ts new file mode 100644 index 00000000000..bf756595b5d --- /dev/null +++ b/tests/runner-tests/skip-duplicates/src/mapping.ts @@ -0,0 +1,8 @@ +import { ethereum } from "@graphprotocol/graph-ts"; +import { Ping } from "../generated/schema"; + +export function handleBlock(block: ethereum.Block): void { + let entity = new Ping("duplicate-entity"); + entity.value = "ping"; + entity.save(); +} diff --git a/tests/runner-tests/skip-duplicates/subgraph.yaml b/tests/runner-tests/skip-duplicates/subgraph.yaml new file mode 100644 index 00000000000..068301105c2 --- /dev/null +++ b/tests/runner-tests/skip-duplicates/subgraph.yaml @@ -0,0 +1,25 @@ +specVersion: 0.0.8 +schema: + file: ./schema.graphql +dataSources: + - kind: ethereum/contract + name: Contract + network: test + source: + address: "0x0000000000000000000000000000000000000000" + abi: Contract + mapping: + kind: ethereum/events + apiVersion: 0.0.7 + language: wasm/assemblyscript + entities: + - Ping + abis: + - name: Contract + file: ./abis/Contract.abi + blockHandlers: + - handler: handleBlock + filter: + kind: polling + every: 1 + file: ./src/mapping.ts diff --git a/tests/tests/runner_tests.rs b/tests/tests/runner_tests.rs index 27ffe9422f6..a215d270a5d 100644 --- a/tests/tests/runner_tests.rs +++ b/tests/tests/runner_tests.rs @@ -1381,3 +1381,40 @@ async fn aggregation_current_bucket() { }) ); } + +#[graph::test] +async fn skip_duplicates() { + let RunnerTestRecipe { stores, test_info } = + RunnerTestRecipe::new("skip_duplicates", "skip-duplicates").await; + + let blocks = { + let block_0 = genesis(); + let mut block_1 = empty_block(block_0.ptr(), test_ptr(1)); + push_test_polling_trigger(&mut block_1); + let mut block_2 = empty_block(block_1.ptr(), test_ptr(2)); + push_test_polling_trigger(&mut block_2); + let mut block_3 = empty_block(block_2.ptr(), test_ptr(3)); + push_test_polling_trigger(&mut block_3); + let mut block_4 = empty_block(block_3.ptr(), test_ptr(4)); + push_test_polling_trigger(&mut block_4); + vec![block_0, block_1, block_2, block_3, block_4] + }; + + let stop_block = blocks.last().unwrap().block.ptr(); + let chain = chain(&test_info.test_name, blocks, &stores, None).await; + let ctx = fixture::setup(&test_info, &stores, &chain, None, None).await; + + ctx.start_and_sync_to(stop_block).await; + + let query_res = ctx + .query(r#"{ ping(id: "duplicate-entity") { id, value } }"#) + .await + .unwrap(); + + assert_eq!( + query_res, + Some(object! { + ping: object! { id: "duplicate-entity", value: "ping" } + }) + ); +}