Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion graph/examples/append_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub fn main() -> anyhow::Result<()> {
};
mods.push(md);
}
let mut group = RowGroup::new(THING_TYPE.clone(), false);
let mut group = RowGroup::new(THING_TYPE.clone());

let start = Instant::now();
for md in mods {
Expand Down
114 changes: 99 additions & 15 deletions graph/src/components/store/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ use std::{
sync::Arc,
};

use slog::Logger;

use crate::{
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
cheap_clone::CheapClone,
Expand Down Expand Up @@ -338,24 +340,22 @@ impl LastMod {
#[derive(Debug, CacheWeight)]
pub struct RowGroup {
pub entity_type: EntityType,

/// All changes for this entity type, ordered by block; i.e., if `i < j`
/// then `rows[i].block() <= rows[j].block()`. Several methods on this
/// struct rely on the fact that this ordering is observed.
rows: Vec<EntityModification>,

immutable: bool,

/// Map the `key.entity_id` of all entries in `rows` to the index with
/// the most recent entry for that id to speed up lookups.
last_mod: LastMod,
}

impl RowGroup {
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
pub fn new(entity_type: EntityType) -> Self {
Self {
entity_type,
rows: Vec::new(),
immutable,
last_mod: LastMod::new(),
}
}
Expand Down Expand Up @@ -471,7 +471,7 @@ impl RowGroup {
/// Append `row` to `self.rows` by combining it with a previously
/// existing row, if that is possible
fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> {
if self.immutable {
if self.entity_type.is_immutable() {
match row {
EntityModification::Insert { .. } => {
// Check if this is an attempt to overwrite an immutable
Expand All @@ -488,6 +488,9 @@ impl RowGroup {
.and_then(|&idx| self.rows.get(idx))
{
Some(prev) if prev.block() != row.block() => {
if self.entity_type.skip_duplicates() {
return Ok(());
}
return Err(StoreError::Input(
format!("entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}",
row.key(), row.block(), prev.block())));
Expand All @@ -498,6 +501,9 @@ impl RowGroup {
self.push_row(row);
}
EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => {
if self.entity_type.skip_duplicates() {
return Ok(());
}
return Err(internal_error!(
"immutable entity type {} only allows inserts, not {:?}",
self.entity_type,
Expand Down Expand Up @@ -604,8 +610,8 @@ impl RowGroup {
pub struct RowGroupForPerfTest(RowGroup);

impl RowGroupForPerfTest {
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
Self(RowGroup::new(entity_type, immutable))
pub fn new(entity_type: EntityType) -> Self {
Self(RowGroup::new(entity_type))
}

pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> {
Expand Down Expand Up @@ -661,11 +667,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> {
#[derive(Debug, CacheWeight)]
pub struct RowGroups {
pub groups: Vec<RowGroup>,
logger: Logger,
}

impl RowGroups {
fn new() -> Self {
Self { groups: Vec::new() }
fn new(logger: Logger) -> Self {
Self {
groups: Vec::new(),
logger,
}
}

fn group(&self, entity_type: &EntityType) -> Option<&RowGroup> {
Expand All @@ -684,9 +694,7 @@ impl RowGroups {
match pos {
Some(pos) => &mut self.groups[pos],
None => {
let immutable = entity_type.is_immutable();
self.groups
.push(RowGroup::new(entity_type.clone(), immutable));
self.groups.push(RowGroup::new(entity_type.clone()));
// unwrap: we just pushed an entry
self.groups.last_mut().unwrap()
}
Expand Down Expand Up @@ -784,6 +792,7 @@ impl Batch {
deterministic_errors: Vec<SubgraphError>,
offchain_to_remove: Vec<StoredDynamicDataSource>,
is_non_fatal_errors_active: bool,
logger: Logger,
) -> Result<Self, StoreError> {
let block = block_ptr.number;

Expand All @@ -797,7 +806,7 @@ impl Batch {
EntityModification::Remove { .. } => 0,
});

let mut mods = RowGroups::new();
let mut mods = RowGroups::new(logger);

for m in raw_mods {
mods.group_entry(&m.key().entity_type).push(m, block)?;
Expand Down Expand Up @@ -1079,7 +1088,6 @@ mod test {
let group = RowGroup {
entity_type: ENTRY_TYPE.clone(),
rows,
immutable: false,
last_mod,
};
let act = group
Expand Down Expand Up @@ -1120,6 +1128,8 @@ mod test {
type Thing @entity { id: ID!, count: Int! }
type RowGroup @entity { id: ID! }
type Entry @entity { id: ID! }
type ImmThing @entity(immutable: true) { id: ID!, count: Int! }
type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! }
"#;
lazy_static! {
static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap();
Expand All @@ -1128,6 +1138,8 @@ mod test {
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap();
static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap();
static ref IMM_THING_TYPE: EntityType = SCHEMA.entity_type("ImmThing").unwrap();
static ref SKIP_DUP_THING_TYPE: EntityType = SCHEMA.entity_type("SkipDupThing").unwrap();
}

/// Convenient notation for changes to a fixed entity
Expand Down Expand Up @@ -1187,7 +1199,7 @@ mod test {
impl Group {
fn new() -> Self {
Self {
group: RowGroup::new(THING_TYPE.clone(), false),
group: RowGroup::new(THING_TYPE.clone()),
}
}

Expand Down Expand Up @@ -1292,4 +1304,76 @@ mod test {
let op = group.last_op(&key, 0);
assert_eq!(None, op);
}

fn make_insert(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
EntityModification::Insert {
key: entity_type.parse_key(id).unwrap(),
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
block,
end: None,
}
}

fn make_overwrite(
entity_type: &EntityType,
id: &str,
block: BlockNumber,
) -> EntityModification {
EntityModification::Overwrite {
key: entity_type.parse_key(id).unwrap(),
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
block,
end: None,
}
}

fn make_remove(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
EntityModification::Remove {
key: entity_type.parse_key(id).unwrap(),
block,
}
}

#[test]
fn append_row_immutable_default_rejects_cross_block_duplicate() {
let mut group = RowGroup::new(IMM_THING_TYPE.clone());
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 2), 2);
assert!(res.is_err());
}

#[test]
fn append_row_skip_duplicates_allows_cross_block_duplicate() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 2), 2);
assert!(res.is_ok());
assert_eq!(group.row_count(), 1);
}

#[test]
fn append_row_skip_duplicates_allows_overwrite() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
let res = group.append_row(make_overwrite(&SKIP_DUP_THING_TYPE, "one", 1));
assert!(res.is_ok());
}

#[test]
fn append_row_skip_duplicates_allows_remove() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
let res = group.append_row(make_remove(&SKIP_DUP_THING_TYPE, "one", 1));
assert!(res.is_ok());
}

#[test]
fn append_row_skip_duplicates_same_block_still_pushes() {
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
assert!(res.is_ok());
assert_eq!(group.row_count(), 2);
}
}
4 changes: 4 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl EntityType {
self.schema.is_immutable(self.atom)
}

pub fn skip_duplicates(&self) -> bool {
self.schema.skip_duplicates(self.atom)
}

pub fn id_type(&self) -> Result<IdType, Error> {
self.schema.id_type(self.atom)
}
Expand Down
Loading
Loading