diff --git a/backend/src/database/repositories/organizationRepository.ts b/backend/src/database/repositories/organizationRepository.ts index d1cd51a443..af57c1c9c4 100644 --- a/backend/src/database/repositories/organizationRepository.ts +++ b/backend/src/database/repositories/organizationRepository.ts @@ -1155,6 +1155,7 @@ class OrganizationRepository { OrganizationField.INDUSTRY, OrganizationField.FOUNDED, OrganizationField.IS_TEAM_ORGANIZATION, + OrganizationField.IS_AFFILIATION_BLOCKED, OrganizationField.MANUALLY_CREATED, ]) diff --git a/backend/src/services/organizationService.ts b/backend/src/services/organizationService.ts index e42dbabfd8..afdcb6ea5e 100644 --- a/backend/src/services/organizationService.ts +++ b/backend/src/services/organizationService.ts @@ -27,6 +27,7 @@ import { findOrgById, upsertOrgIdentities, } from '@crowd/data-access-layer/src/organizations' +import { findSegmentByName } from '@crowd/data-access-layer/src/segments' import { LoggerBase } from '@crowd/logging' import { WorkflowIdReusePolicy } from '@crowd/temporal' import { @@ -913,6 +914,20 @@ export default class OrganizationService extends LoggerBase { await upsertOrgIdentities(qx, record.id, data.identities) } else { + if (data.displayName) { + // Block organization affiliation if a segment (project, subproject, or project group) + // has the same name as the organization when creating one. + const segment = await findSegmentByName(qx, data.displayName) + if (segment) { + this.log.info( + { displayName: data.displayName }, + 'Found segment with the same name as the organization, blocking affiliation!', + ) + + data.isAffiliationBlocked = true + } + } + record = await OrganizationRepository.create(data, txOptions) telemetryTrack( 'Organization created', @@ -944,10 +959,9 @@ export default class OrganizationService extends LoggerBase { await SequelizeRepository.commitTransaction(transaction) - if (syncOptions.doSync) { - const searchSyncService = new SearchSyncService(this.options, syncOptions.mode) - await searchSyncService.triggerOrganizationSync(record.id) - } + await this.startOrganizationUpdateWorkflow(record.id, { + syncToOpensearch: syncOptions.doSync, + }) return result } catch (error) { @@ -1069,25 +1083,9 @@ export default class OrganizationService extends LoggerBase { await SequelizeRepository.commitTransaction(tx) - await this.options.temporal.workflow.start('organizationUpdate', { - taskQueue: 'profiles', - workflowId: `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${id}`, - workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING, - retry: { - maximumAttempts: 10, - }, - args: [ - { - organization: { - id: record.id, - }, - recalculateAffiliations, - syncOptions: { - doSync: syncToOpensearch, - withAggs: false, - }, - }, - ], + await this.startOrganizationUpdateWorkflow(record.id, { + recalculateAffiliations, + syncToOpensearch, }) return record @@ -1232,4 +1230,29 @@ export default class OrganizationService extends LoggerBase { throw error } } + + async startOrganizationUpdateWorkflow( + organizationId: string, + { syncToOpensearch = false, recalculateAffiliations = false }, + ) { + await this.options.temporal.workflow.start('organizationUpdate', { + taskQueue: 'profiles', + workflowId: `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`, + workflowIdReusePolicy: WorkflowIdReusePolicy.WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING, + retry: { + maximumAttempts: 10, + }, + args: [ + { + organization: { + id: organizationId, + }, + recalculateAffiliations, + syncOptions: { + doSync: syncToOpensearch, + }, + }, + ], + }) + } } diff --git a/backend/src/services/segmentService.ts b/backend/src/services/segmentService.ts index c4678a8638..878dc3ba90 100644 --- a/backend/src/services/segmentService.ts +++ b/backend/src/services/segmentService.ts @@ -1,8 +1,13 @@ import { Transaction } from 'sequelize' import { Error400, validateNonLfSlug } from '@crowd/common' -import { QueryExecutor } from '@crowd/data-access-layer' +import { + QueryExecutor, + findOrganizationsByName, + updateOrganization, +} from '@crowd/data-access-layer' import { ICreateInsightsProject, findBySlug } from '@crowd/data-access-layer/src/collections' +import { applyOrganizationAffiliationPolicyToMembers } from '@crowd/data-access-layer/src/member_organization_affiliation_overrides' import { buildSegmentActivityTypes, isSegmentSubproject, @@ -25,6 +30,7 @@ import SequelizeRepository from '../database/repositories/sequelizeRepository' import { IServiceOptions } from './IServiceOptions' import { CollectionService } from './collectionService' +import OrganizationService from './organizationService' interface UnnestedActivityTypes { [key: string]: any @@ -131,8 +137,24 @@ export default class SegmentService extends LoggerBase { transaction, ) + // Block org affiliation if project group name matches an organization name + const orgIds = await this.blockOrganizationAffiliationIfSegmentNameMatches( + data.name, + transaction, + ) + await SequelizeRepository.commitTransaction(transaction) + const organizationService = new OrganizationService(this.options) + + for (const orgId of orgIds) { + // Trigger org update workflow to recalculate affiliations + await organizationService.startOrganizationUpdateWorkflow(orgId, { + syncToOpensearch: true, + recalculateAffiliations: true, + }) + } + return await this.findById(projectGroup.id) } catch (error) { await SequelizeRepository.rollbackTransaction(transaction) @@ -192,8 +214,24 @@ export default class SegmentService extends LoggerBase { transaction, ) + const organizationService = new OrganizationService(this.options) + + // Block org affiliation if project name matches an organization name + const orgIds = await this.blockOrganizationAffiliationIfSegmentNameMatches( + data.name, + transaction, + ) + await SequelizeRepository.commitTransaction(transaction) + for (const orgId of orgIds) { + // Trigger org update workflow to recalculate affiliations + await organizationService.startOrganizationUpdateWorkflow(orgId, { + syncToOpensearch: true, + recalculateAffiliations: true, + }) + } + return await this.findById(project.id) } catch (error) { await SequelizeRepository.rollbackTransaction(transaction) @@ -202,10 +240,35 @@ export default class SegmentService extends LoggerBase { } async createSubproject(data: SegmentData): Promise { - return SequelizeRepository.withTx(this.options, async (tx) => { - const qx = SequelizeRepository.getQueryExecutor({ ...this.options, transaction: tx }) - return this.createSubprojectInternal(data, qx, tx) - }) + const transaction = await SequelizeRepository.createTransaction(this.options) + const qx = SequelizeRepository.getQueryExecutor({ ...this.options, transaction }) + + try { + const subproject = await this.createSubprojectInternal(data, qx, transaction) + + const orgIds = await this.blockOrganizationAffiliationIfSegmentNameMatches( + data.name, + transaction, + ) + + await SequelizeRepository.commitTransaction(transaction) + + if (orgIds?.length) { + const organizationService = new OrganizationService(this.options) + + for (const orgId of orgIds) { + await organizationService.startOrganizationUpdateWorkflow(orgId, { + syncToOpensearch: true, + recalculateAffiliations: true, + }) + } + } + + return await this.findById(subproject.id) + } catch (error) { + await SequelizeRepository.rollbackTransaction(transaction) + throw error + } } async createSubprojectInternal( @@ -638,6 +701,37 @@ export default class SegmentService extends LoggerBase { } } + private async blockOrganizationAffiliationIfSegmentNameMatches( + segmentName: string, + transaction: Transaction, + ): Promise { + const qx = SequelizeRepository.getQueryExecutor({ + ...this.options, + transaction, + }) + + // Check if there is an existing organization with segment name + const organizations = await findOrganizationsByName(qx, segmentName) + + if (organizations.length === 0) { + return [] + } + + const result: string[] = [] + + for (const o of organizations) { + if (!o.isAffiliationBlocked) { + const updatedOrgId = await updateOrganization(qx, o.id, { isAffiliationBlocked: true }) + if (updatedOrgId) { + await applyOrganizationAffiliationPolicyToMembers(qx, updatedOrgId, false) + result.push(updatedOrgId) + } + } + } + + return result + } + /** * Throws an appropriate error message when a segment conflict is detected. * This method dynamically generates error messages based on the existing conflicting segment, diff --git a/services/libs/data-access-layer/src/organizations/attributesConfig.ts b/services/libs/data-access-layer/src/organizations/attributesConfig.ts index 8010ceaeb7..bc771252dd 100644 --- a/services/libs/data-access-layer/src/organizations/attributesConfig.ts +++ b/services/libs/data-access-layer/src/organizations/attributesConfig.ts @@ -17,6 +17,7 @@ export const ORG_DB_FIELDS = [ 'importHash', 'location', 'isTeamOrganization', + 'isAffiliationBlocked', 'type', 'size', 'headline', diff --git a/services/libs/data-access-layer/src/organizations/base.ts b/services/libs/data-access-layer/src/organizations/base.ts index b451244e50..6b4c8f90d0 100644 --- a/services/libs/data-access-layer/src/organizations/base.ts +++ b/services/libs/data-access-layer/src/organizations/base.ts @@ -15,6 +15,7 @@ import { } from '@crowd/types' import { QueryExecutor } from '../queryExecutor' +import { findSegmentByName } from '../segments' import { QueryOptions, QueryResult, queryTable, queryTableById } from '../utils' import { prepareSelectColumns } from '../utils' @@ -36,6 +37,7 @@ const ORG_SELECT_COLUMNS = [ 'importHash', 'location', 'isTeamOrganization', + 'isAffiliationBlocked', 'type', 'size', 'headline', @@ -125,23 +127,25 @@ export async function findOrgsByIds( return results } -export async function findOrgByName( +export async function findOrganizationsByName( qx: QueryExecutor, name: string, -): Promise { - const result = await qx.selectOneOrNone( + options: { limit?: number } = {}, +): Promise { + const { limit } = options + + return qx.select( ` - select ${prepareSelectColumns(ORG_SELECT_COLUMNS, 'o')} - from organizations o - where trim(lower(o."displayName")) = trim(lower($(name))) - limit 1; + select ${prepareSelectColumns(ORG_SELECT_COLUMNS, 'o')} + from organizations o + where lower(trim(o."displayName")) = lower(trim($(name))) + ${limit !== undefined ? 'limit $(limit)' : ''} `, { name, + limit, }, ) - - return result } export async function findOrgByVerifiedDomain( @@ -382,29 +386,34 @@ export async function insertOrganization( export async function updateOrganization( qe: QueryExecutor, organizationId: string, - data: IDbOrganizationInput, -): Promise { + data: Partial, +): Promise { const columns = Object.keys(data) if (columns.length === 0) { - return + return null } const updatedAt = new Date() - const oneMinuteAgo = new Date(updatedAt.getTime() - 60 * 1000) columns.push('updatedAt') const query = ` update organizations set ${columns.map((c) => `"${c}" = $(${c})`).join(',\n')} - where id = $(organizationId) and "updatedAt" <= $(oneMinuteAgo) + where id = $(organizationId) + returning id; ` - await qe.selectNone(query, { + const result = await qe.selectOneOrNone(query, { ...data, organizationId, updatedAt, - oneMinuteAgo, }) + + if (!result) { + return null + } + + return result.id } export async function getTimeseriesOfNewOrganizations( @@ -499,11 +508,15 @@ export async function findOrCreateOrganization( } if (!existing) { - existing = await logExecutionTimeV2( - async () => findOrgByName(qe, data.displayName), + const organizations = await logExecutionTimeV2( + async () => findOrganizationsByName(qe, data.displayName, { limit: 1 }), log, - 'organizationService -> findOrCreateOrganization -> findOrgByName', + 'organizationService -> findOrCreateOrganization -> findOrganizationsByName', ) + + if (organizations.length > 0) { + existing = organizations[0] + } } let id @@ -561,7 +574,7 @@ export async function findOrCreateOrganization( log.trace(`Organization wasn't found via website or identities.`) const displayName = data.displayName ? data.displayName : verifiedIdentities[0].value - const payload = { + const payload: Partial = { displayName, description: data.description, logo: data.logo, @@ -575,6 +588,13 @@ export async function findOrCreateOrganization( founded: data.founded, } + // Block organization affiliation if a segment (project, subproject, or project group) + // has the same name as the organization when creating one. + const segment = await findSegmentByName(qe, displayName) + if (segment) { + payload.isAffiliationBlocked = true + } + log.trace({ data, payload }, `Preparing payload to create a new organization!`) const processed = prepareOrganizationData(payload, source) diff --git a/services/libs/data-access-layer/src/organizations/types.ts b/services/libs/data-access-layer/src/organizations/types.ts index 743e40bdc3..9c948f02bb 100644 --- a/services/libs/data-access-layer/src/organizations/types.ts +++ b/services/libs/data-access-layer/src/organizations/types.ts @@ -16,6 +16,7 @@ export interface IDbOrganization { importHash?: string location?: string isTeamOrganization: boolean + isAffiliationBlocked?: boolean type?: string size?: string headline?: string @@ -36,6 +37,7 @@ export interface IDbOrganizationInput { importHash?: string location?: string isTeamOrganization: boolean + isAffiliationBlocked?: boolean type?: string size?: string headline?: string diff --git a/services/libs/data-access-layer/src/segments/index.ts b/services/libs/data-access-layer/src/segments/index.ts index 9934fa622f..d56f02b367 100644 --- a/services/libs/data-access-layer/src/segments/index.ts +++ b/services/libs/data-access-layer/src/segments/index.ts @@ -22,6 +22,21 @@ export async function findProjectGroupByName( ) } +export async function findSegmentByName( + qx: QueryExecutor, + name: string, +): Promise { + return qx.selectOneOrNone( + ` + SELECT * + FROM segments + WHERE trim(lower(name)) = trim(lower($(name))) + LIMIT 1; + `, + { name }, + ) +} + export async function fetchManySegments( qx, segmentIds: string[],