Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PROD-972 ID-1324 Deduplicate group synchronization calls to google. #1472

Merged
merged 7 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ object Dependencies {
val postgresDriverVersion = "42.7.2"
val sentryVersion = "6.15.0"

val workbenchLibV = "a6ad7dc" // If updating this, make sure googleStorageLocal in test dependencies is up-to-date
val workbenchLibV = "8d55689" // If updating this, make sure googleStorageLocal in test dependencies is up-to-date
val workbenchUtilV = s"0.10-$workbenchLibV"
val workbenchUtil2V = s"0.9-$workbenchLibV"
val workbenchModelV = s"0.19-$workbenchLibV"
val workbenchModelV = s"0.20-$workbenchLibV"
val workbenchGoogleV = s"0.32-$workbenchLibV"
val workbenchGoogle2V = s"0.36-$workbenchLibV"
val workbenchNotificationsV = s"0.6-$workbenchLibV"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@
<include file="changesets/20231019_sam_user_attributes_table.xml" relativeToChangelogFile="true"/>
<include file="changesets/20240417_action_managed_identities.xml" relativeToChangelogFile="true"/>
<include file="changesets/20240416_add_sam_rac_tables.xml" relativeToChangelogFile="true"/>
<include file="changesets/20240701_add_group_version_and_last_synchronized_version.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog logicalFilePath="dummy"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">

<changeSet logicalFilePath="dummy" author="tgarwood" id="add_group_version_and_last_synchronized_version">

<addColumn tableName="sam_group">
<!-- Default value initially set to 1 for all existing records -->
<column name="version" type="BIGINT" defaultValue="1">
<constraints nullable="false"/>
</column>
</addColumn>

<!-- Default value initially set to null for all existing records -->
<addColumn tableName="sam_group">
<column name="last_synchronized_version" type="BIGINT">
<constraints nullable="true"/>
</column>
</addColumn>

</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ trait DirectoryDAO {

def isGroupMember(groupId: WorkbenchGroupIdentity, member: WorkbenchSubject, samRequestContext: SamRequestContext): IO[Boolean]

def updateSynchronizedDate(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit]
def updateSynchronizedDateAndVersion(group: WorkbenchGroup, samRequestContext: SamRequestContext): IO[Unit]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to pass in the group here so that we can use the version of the group that is held in memory when the sync started.


def updateGroupUpdatedDateAndVersionWithSession(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit]
Copy link
Contributor Author

@Ghost-in-a-Jar Ghost-in-a-Jar Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to update the policies in resource service and it just creates a write db session and then calls updateGroupUpdatedDateAndVersion

Copy link
Contributor Author

@Ghost-in-a-Jar Ghost-in-a-Jar Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updateGroupUpdatedDateAndVersion is also only on PostgresGroupDAO which ResourceService doesnt have access to


def getSynchronizedDate(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Option[Date]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ class PostgresAccessPolicyDAO(
}
removeAllGroupMembers(groupId)
insertGroupMembers(groupId, memberList)
updateGroupUpdatedDateAndVersion(id)
}

override def overwritePolicy(newPolicy: AccessPolicy, samRequestContext: SamRequestContext): IO[AccessPolicy] =
Expand Down Expand Up @@ -1084,7 +1085,7 @@ class PostgresAccessPolicyDAO(
val ra = ResourceActionTable.syntax("ra")

val listPoliciesQuery =
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${rr.result.role}, ${ra.result.action}
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${g.result.version}, ${g.result.lastSynchronizedVersion}, ${rr.result.role}, ${ra.result.action}
from ${GroupTable as g}
join ${PolicyTable as p} on ${g.id} = ${p.groupId}
join ${ResourceTable as r} on ${p.resourceId} = ${r.id}
Expand All @@ -1106,7 +1107,9 @@ class PostgresAccessPolicyDAO(
rs.get[ResourceId](r.resultName.name),
rs.get[ResourceTypeName](rt.resultName.name),
rs.get[WorkbenchEmail](g.resultName.email),
rs.boolean(p.resultName.public)
rs.boolean(p.resultName.public),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
),
(rs.stringOpt(rr.resultName.role).map(ResourceRoleName(_)), rs.stringOpt(ra.resultName.action).map(ResourceAction(_)))
)
Expand Down Expand Up @@ -1163,7 +1166,9 @@ class PostgresAccessPolicyDAO(
policyRoles,
policyActions,
policyDescendantPermissions,
policyInfo.public
policyInfo.public,
policyInfo.version,
policyInfo.lastSynchronizedVersion
)
}
.to(LazyList)
Expand Down Expand Up @@ -1215,7 +1220,7 @@ class PostgresAccessPolicyDAO(
val part = ResourceTypeTable.syntax("part") // policy action resource type

val listPoliciesQuery =
samsql"""select ${p.result.name}, ${g.result.email}, ${p.result.public}, ${prrt.result.name}, ${rr.result.role}, ${pr.result.descendantsOnly}, ${part.result.name}, ${ra.result.action}, ${pa.result.descendantsOnly}
samsql"""select ${p.result.name}, ${g.result.email}, ${p.result.public}, ${g.result.version}, ${g.result.lastSynchronizedVersion}, ${prrt.result.name}, ${rr.result.role}, ${pr.result.descendantsOnly}, ${part.result.name}, ${ra.result.action}, ${pa.result.descendantsOnly}
from ${GroupTable as g}
join ${PolicyTable as p} on ${g.id} = ${p.groupId}
left join ${PolicyRoleTable as pr} on ${p.id} = ${pr.resourcePolicyId}
Expand All @@ -1234,7 +1239,9 @@ class PostgresAccessPolicyDAO(
resource.resourceId,
resource.resourceTypeName,
rs.get[WorkbenchEmail](g.resultName.email),
rs.boolean(p.resultName.public)
rs.boolean(p.resultName.public),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
),
(
RoleResult(
Expand Down Expand Up @@ -1469,7 +1476,7 @@ class PostgresAccessPolicyDAO(
val ra = ResourceActionTable.syntax("ra")

val listPoliciesQuery =
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${rr.result.role}, ${ra.result.action}
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${g.result.version}, ${g.result.lastSynchronizedVersion}, ${rr.result.role}, ${ra.result.action}
from ${GroupTable as g}
join ${GroupMemberFlatTable as f} on ${f.groupId} = ${g.id}
join ${PolicyTable as p} on ${g.id} = ${p.groupId}
Expand All @@ -1490,7 +1497,9 @@ class PostgresAccessPolicyDAO(
rs.get[ResourceId](r.resultName.name),
rs.get[ResourceTypeName](rt.resultName.name),
rs.get[WorkbenchEmail](g.resultName.email),
rs.boolean(p.resultName.public)
rs.boolean(p.resultName.public),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
),
(rs.stringOpt(rr.resultName.role).map(ResourceRoleName(_)), rs.stringOpt(ra.resultName.action).map(ResourceAction(_)))
)
Expand Down Expand Up @@ -2057,7 +2066,15 @@ class PostgresAccessPolicyDAO(
}
}

private final case class PolicyInfo(name: AccessPolicyName, resourceId: ResourceId, resourceTypeName: ResourceTypeName, email: WorkbenchEmail, public: Boolean)
private final case class PolicyInfo(
name: AccessPolicyName,
resourceId: ResourceId,
resourceTypeName: ResourceTypeName,
email: WorkbenchEmail,
public: Boolean,
version: Int,
lastSynchronizedVersion: Option[Int]
)
private final case class RoleResult(resourceTypeName: Option[ResourceTypeName], role: Option[ResourceRoleName], descendantsOnly: Option[Boolean])
private final case class ActionResult(resourceTypeName: Option[ResourceTypeName], action: Option[ResourceAction], descendantsOnly: Option[Boolean])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
val r = ResourceTable.syntax("r")
val rt = ResourceTypeTable.syntax("rt")

samsql"""select ${g.result.email}, ${gm.result.memberUserId}, ${sg.result.name}, ${p.result.name}, ${r.result.name}, ${rt.result.name}
samsql"""select ${g.result.email}, ${gm.result.memberUserId}, ${sg.result.name}, ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.version}, ${g.result.lastSynchronizedVersion}
from ${GroupTable as g}
left join ${GroupMemberTable as gm} on ${g.id} = ${gm.groupId}
left join ${GroupTable as sg} on ${gm.memberGroupId} = ${sg.id}
Expand All @@ -100,7 +100,9 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
rs.stringOpt(sg.resultName.name).map(WorkbenchGroupName),
rs.stringOpt(p.resultName.name).map(AccessPolicyName(_)),
rs.stringOpt(r.resultName.name).map(ResourceId(_)),
rs.stringOpt(rt.resultName.name).map(ResourceTypeName(_))
rs.stringOpt(rt.resultName.name).map(ResourceTypeName(_)),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
)
}
.list()
Expand All @@ -112,13 +114,16 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
} else {
val email = results.head._1
val members: Set[WorkbenchSubject] = results.collect {
case (_, Some(userId), None, None, None, None) => userId
case (_, None, Some(subGroupName), None, None, None) => subGroupName
case (_, None, Some(_), Some(policyName), Some(resourceName), Some(resourceTypeName)) =>
case (_, Some(userId), None, None, None, None, _, _) => userId
case (_, None, Some(subGroupName), None, None, None, _, _) => subGroupName
case (_, None, Some(_), Some(policyName), Some(resourceName), Some(resourceTypeName), _, _) =>
FullyQualifiedPolicyId(FullyQualifiedResourceId(resourceTypeName, resourceName), policyName)
}.toSet

Option(BasicWorkbenchGroup(groupName, members, email))
val version = results.head._7
val lastSynchronized = results.head._8

Option(BasicWorkbenchGroup(groupName, members, email, version, lastSynchronized))
}

override def loadGroupEmail(groupName: WorkbenchGroupName, samRequestContext: SamRequestContext): IO[Option[WorkbenchEmail]] =
Expand Down Expand Up @@ -154,7 +159,7 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
serializableWriteTransaction("addGroupMember", samRequestContext) { implicit session =>
val numberAdded = insertGroupMembers(queryForGroupPKs(Set(groupId)).head, Set(addMember))
if (numberAdded > 0) {
updateGroupUpdatedDate(groupId)
updateGroupUpdatedDateAndVersion(groupId)
true
} else {
false
Expand All @@ -169,21 +174,34 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
val removed = removeGroupMember(groupId, removeMember)

if (removed) {
updateGroupUpdatedDate(groupId)
updateGroupUpdatedDateAndVersion(groupId)
}

removed
}

override def updateGroupUpdatedDateAndVersionWithSession(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit] =
serializableWriteTransaction("updateGroupUpdatedDateAndVersionWithSession", samRequestContext) { implicit session =>
updateGroupUpdatedDateAndVersion(groupId)
}

override def isGroupMember(groupId: WorkbenchGroupIdentity, member: WorkbenchSubject, samRequestContext: SamRequestContext): IO[Boolean] =
readOnlyTransaction("isGroupMember", samRequestContext) { implicit session =>
isGroupMember(groupId, member)
}

override def updateSynchronizedDate(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit] =
serializableWriteTransaction("updateSynchronizedDate", samRequestContext) { implicit session =>
/*
Update last synchronized version only when it is less than the current group version. This is to avoid
threads stepping over each other and causing sam to become out of sync with google. The last synchronized version
should only be set to the version of the group that is help in memory from when the sync started.
*/
override def updateSynchronizedDateAndVersion(group: WorkbenchGroup, samRequestContext: SamRequestContext): IO[Unit] =
serializableWriteTransaction("updateSynchronizedDateAndVersion", samRequestContext) { implicit session =>
val g = GroupTable.column
samsql"update ${GroupTable.table} set ${g.synchronizedDate} = ${Instant.now()} where ${g.id} = (${workbenchGroupIdentityToGroupPK(groupId)})"
samsql"""update ${GroupTable.table}
set ${g.synchronizedDate} = ${Instant.now()},
${g.lastSynchronizedVersion} = ${group.version}
where ${g.id} = (${workbenchGroupIdentityToGroupPK(group.id)}) and COALESCE(${g.lastSynchronizedVersion}, 0) < ${group.version}"""
.update()
.apply()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,12 @@ trait PostgresGroupDAO {
query.map(rs => rs.int(1)).single().apply().getOrElse(0) > 0
}

def updateGroupUpdatedDate(groupId: WorkbenchGroupIdentity)(implicit session: DBSession): Int = {
def updateGroupUpdatedDateAndVersion(groupId: WorkbenchGroupIdentity)(implicit session: DBSession): Int = {
val g = GroupTable.column
samsql"update ${GroupTable.table} set ${g.updatedDate} = ${Instant.now()} where ${g.id} = (${workbenchGroupIdentityToGroupPK(groupId)})".update().apply()
samsql"""update ${GroupTable.table}
set ${g.updatedDate} = ${Instant.now()},
${g.version} = ${g.version} + 1
where ${g.id} = (${workbenchGroupIdentityToGroupPK(groupId)})""".update().apply()
}

def deleteGroup(groupName: WorkbenchGroupName)(implicit session: DBSession): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ import scalikejdbc._
import java.time.Instant

final case class GroupPK(value: Long) extends DatabaseKey
final case class GroupRecord(id: GroupPK, name: WorkbenchGroupName, email: WorkbenchEmail, updatedDate: Option[Instant], synchronizedDate: Option[Instant])
final case class GroupRecord(
id: GroupPK,
name: WorkbenchGroupName,
email: WorkbenchEmail,
version: Int,
lastSynchronizedVersion: Option[Int],
updatedDate: Option[Instant],
synchronizedDate: Option[Instant]
)

object GroupTable extends SQLSyntaxSupportWithDefaultSamDB[GroupRecord] {
override def tableName: String = "SAM_GROUP"
Expand All @@ -17,6 +25,8 @@ object GroupTable extends SQLSyntaxSupportWithDefaultSamDB[GroupRecord] {
rs.get(e.id),
rs.get(e.name),
rs.get(e.email),
rs.get(e.version),
rs.get(e.lastSynchronizedVersion),
rs.timestampOpt(e.updatedDate).map(_.toInstant),
rs.timestampOpt(e.synchronizedDate).map(_.toInstant)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class GoogleGroupSyncMessageReceiver(groupSynchronizer: GoogleGroupSynchronizer)
logger.info(s"group to synchronize not found: ${groupNotFound.errorReport}")
consumer.ack()

case groupAlreadySynchronized: GroupAlreadySynchronized =>
// this can happen if a group is synchronized multiple times in quick succession
// acknowledge it so we don't have to handle it again
logger.info(s"group already previously synchronized: ${groupAlreadySynchronized.getMessage}")
consumer.ack()

case regrets: Throwable =>
logger.error("failure synchronizing google group", regrets)
consumer.nack() // redeliver message to hopefully rectify the failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import org.broadinstitute.dsde.workbench.sam.util.OpenTelemetryIOUtils._
import org.broadinstitute.dsde.workbench.sam.util.SamRequestContext
import org.broadinstitute.dsde.workbench.util.FutureSupport

class GroupAlreadySynchronized(errorReport: ErrorReport = ErrorReport(StatusCodes.Conflict, "Group has already been synchronized"))
extends WorkbenchExceptionWithErrorReport(errorReport)

/** This class makes sure that our google groups have the right members.
*
* For the simple case it merely compares group membership given by directoryDAO against group membership given by googleDirectoryDAO and does the appropriate
Expand Down Expand Up @@ -51,7 +54,15 @@ class GoogleGroupSynchronizer(
IO.pure(Map.empty)
} else {
for {
group <- loadSamGroup(groupId, samRequestContext)
group: WorkbenchGroup <- loadSamGroup(groupId, samRequestContext)
// If group.version > group.lastSynchronizedVersion, then the group needs to be synchronized
// Else Noop
_ <-
if (group.version > group.lastSynchronizedVersion.getOrElse(0)) {
IO.unit
} else {
IO.raiseError(new GroupAlreadySynchronized)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really like handling this with an exception since this is not really an exceptional situation, we just want an early escape.

}
members <- calculateAuthDomainIntersectionIfRequired(group, samRequestContext)
subGroupSyncs <- syncSubGroupsIfRequired(group, visitedGroups, samRequestContext)
googleMemberEmails <- loadGoogleGroupMemberEmailsMaybeCreateGroup(group, samRequestContext)
Expand All @@ -63,7 +74,7 @@ class GoogleGroupSynchronizer(
addedUserSyncReports <- toAdd.toList.traverse(addMemberToGoogleGroup(group, samRequestContext))
removedUserSyncReports <- toRemove.toList.traverse(removeMemberFromGoogleGroup(group, samRequestContext))

_ <- directoryDAO.updateSynchronizedDate(groupId, samRequestContext)
_ <- directoryDAO.updateSynchronizedDateAndVersion(group, samRequestContext)
} yield Map(group.email -> Seq(addedUserSyncReports, removedUserSyncReports).flatten) ++ subGroupSyncs.flatten
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,9 @@ consistent "has a" relationship is tracked by this ticket: https://broadworkbenc
roles: Set[ResourceRoleName],
actions: Set[ResourceAction],
descendantPermissions: Set[AccessPolicyDescendantPermissions],
public: Boolean
public: Boolean,
version: Int = 1,
lastSynchronizedVersion: Option[Int] = None
) extends WorkbenchGroup

@Lenses final case class AccessPolicyDescendantPermissions(resourceType: ResourceTypeName, actions: Set[ResourceAction], roles: Set[ResourceRoleName])
Expand All @@ -231,10 +233,18 @@ consistent "has a" relationship is tracked by this ticket: https://broadworkbenc
email: WorkbenchEmail,
roles: Set[ResourceRoleName],
actions: Set[ResourceAction],
public: Boolean
public: Boolean,
version: Int = 1,
lastSynchronizedVersion: Option[Int] = None
)

@Lenses final case class BasicWorkbenchGroup(id: WorkbenchGroupName, members: Set[WorkbenchSubject], email: WorkbenchEmail) extends WorkbenchGroup
@Lenses final case class BasicWorkbenchGroup(
id: WorkbenchGroupName,
members: Set[WorkbenchSubject],
email: WorkbenchEmail,
version: Int = 1,
lastSynchronizedVersion: Option[Int] = None
Copy link
Contributor Author

@Ghost-in-a-Jar Ghost-in-a-Jar Jul 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Defaulting these values (including in the other models) was to avoid having to change BasicWorkbenchGroup in a ton of places. With the proper tests in place I think this is ok.

) extends WorkbenchGroup
object BasicWorkbenchGroup {
def apply(workbenchGroup: WorkbenchGroup): BasicWorkbenchGroup =
workbenchGroup.id match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ class ManagedGroupService(
val workbenchGroupName = WorkbenchGroupName(resource.resourceId.value)
val groupMembers: Set[WorkbenchSubject] = componentPolicies.collect {
// collect only member and admin policies
case AccessPolicy(id @ FullyQualifiedPolicyId(_, ManagedGroupService.memberPolicyName | ManagedGroupService.adminPolicyName), _, _, _, _, _, _) => id
case AccessPolicy(id @ FullyQualifiedPolicyId(_, ManagedGroupService.memberPolicyName | ManagedGroupService.adminPolicyName), _, _, _, _, _, _, _, _) =>
id
}
directoryDAO.createGroup(BasicWorkbenchGroup(workbenchGroupName, groupMembers, email), accessInstructionsOpt, samRequestContext = samRequestContext)
}
Expand Down
Loading
Loading