Skip to content

Commit

Permalink
Merge branch 'develop' into limit_syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
dvoet authored Jul 2, 2024
2 parents baf94ca + 6e240d2 commit 9e8a54c
Show file tree
Hide file tree
Showing 21 changed files with 238 additions and 62 deletions.
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ object Dependencies {
val postgresDriverVersion = "42.7.2"
val sentryVersion = "6.15.0"

val workbenchLibV = "a6ad7dc" // If updating this, make sure googleStorageLocal in test dependencies is up-to-date
val workbenchLibV = "8d55689" // If updating this, make sure googleStorageLocal in test dependencies is up-to-date
val workbenchUtilV = s"0.10-$workbenchLibV"
val workbenchUtil2V = s"0.9-$workbenchLibV"
val workbenchModelV = s"0.19-$workbenchLibV"
val workbenchModelV = s"0.20-$workbenchLibV"
val workbenchGoogleV = s"0.32-$workbenchLibV"
val workbenchGoogle2V = s"0.36-$workbenchLibV"
val workbenchNotificationsV = s"0.6-$workbenchLibV"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@
<include file="changesets/20231019_sam_user_attributes_table.xml" relativeToChangelogFile="true"/>
<include file="changesets/20240417_action_managed_identities.xml" relativeToChangelogFile="true"/>
<include file="changesets/20240416_add_sam_rac_tables.xml" relativeToChangelogFile="true"/>
<include file="changesets/20240701_add_group_version_and_last_synchronized_version.xml" relativeToChangelogFile="true"/>
</databaseChangeLog>
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog logicalFilePath="dummy"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:ext="http://www.liquibase.org/xml/ns/dbchangelog-ext"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog-ext http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-ext.xsd http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.4.xsd">

<changeSet logicalFilePath="dummy" author="tgarwood" id="add_group_version_and_last_synchronized_version">

<addColumn tableName="sam_group">
<!-- Default value initially set to 1 for all existing records -->
<column name="version" type="BIGINT" defaultValue="1">
<constraints nullable="false"/>
</column>
</addColumn>

<!-- Default value initially set to null for all existing records -->
<addColumn tableName="sam_group">
<column name="last_synchronized_version" type="BIGINT">
<constraints nullable="true"/>
</column>
</addColumn>

</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ trait DirectoryDAO {

def isGroupMember(groupId: WorkbenchGroupIdentity, member: WorkbenchSubject, samRequestContext: SamRequestContext): IO[Boolean]

def updateSynchronizedDate(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit]
def updateSynchronizedDateAndVersion(group: WorkbenchGroup, samRequestContext: SamRequestContext): IO[Unit]

def updateGroupUpdatedDateAndVersionWithSession(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit]

def getSynchronizedDate(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Option[Date]]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,7 @@ class PostgresAccessPolicyDAO(
}
removeAllGroupMembers(groupId)
insertGroupMembers(groupId, memberList)
updateGroupUpdatedDateAndVersion(id)
}

override def overwritePolicy(newPolicy: AccessPolicy, samRequestContext: SamRequestContext): IO[AccessPolicy] =
Expand Down Expand Up @@ -1111,7 +1112,7 @@ class PostgresAccessPolicyDAO(
val ra = ResourceActionTable.syntax("ra")

val listPoliciesQuery =
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${rr.result.role}, ${ra.result.action}
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${g.result.version}, ${g.result.lastSynchronizedVersion}, ${rr.result.role}, ${ra.result.action}
from ${GroupTable as g}
join ${PolicyTable as p} on ${g.id} = ${p.groupId}
join ${ResourceTable as r} on ${p.resourceId} = ${r.id}
Expand All @@ -1133,7 +1134,9 @@ class PostgresAccessPolicyDAO(
rs.get[ResourceId](r.resultName.name),
rs.get[ResourceTypeName](rt.resultName.name),
rs.get[WorkbenchEmail](g.resultName.email),
rs.boolean(p.resultName.public)
rs.boolean(p.resultName.public),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
),
(rs.stringOpt(rr.resultName.role).map(ResourceRoleName(_)), rs.stringOpt(ra.resultName.action).map(ResourceAction(_)))
)
Expand Down Expand Up @@ -1190,7 +1193,9 @@ class PostgresAccessPolicyDAO(
policyRoles,
policyActions,
policyDescendantPermissions,
policyInfo.public
policyInfo.public,
policyInfo.version,
policyInfo.lastSynchronizedVersion
)
}
.to(LazyList)
Expand Down Expand Up @@ -1242,7 +1247,7 @@ class PostgresAccessPolicyDAO(
val part = ResourceTypeTable.syntax("part") // policy action resource type

val listPoliciesQuery =
samsql"""select ${p.result.name}, ${g.result.email}, ${p.result.public}, ${prrt.result.name}, ${rr.result.role}, ${pr.result.descendantsOnly}, ${part.result.name}, ${ra.result.action}, ${pa.result.descendantsOnly}
samsql"""select ${p.result.name}, ${g.result.email}, ${p.result.public}, ${g.result.version}, ${g.result.lastSynchronizedVersion}, ${prrt.result.name}, ${rr.result.role}, ${pr.result.descendantsOnly}, ${part.result.name}, ${ra.result.action}, ${pa.result.descendantsOnly}
from ${GroupTable as g}
join ${PolicyTable as p} on ${g.id} = ${p.groupId}
left join ${PolicyRoleTable as pr} on ${p.id} = ${pr.resourcePolicyId}
Expand All @@ -1261,7 +1266,9 @@ class PostgresAccessPolicyDAO(
resource.resourceId,
resource.resourceTypeName,
rs.get[WorkbenchEmail](g.resultName.email),
rs.boolean(p.resultName.public)
rs.boolean(p.resultName.public),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
),
(
RoleResult(
Expand Down Expand Up @@ -1496,7 +1503,7 @@ class PostgresAccessPolicyDAO(
val ra = ResourceActionTable.syntax("ra")

val listPoliciesQuery =
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${rr.result.role}, ${ra.result.action}
samsql"""select ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.email}, ${p.result.public}, ${g.result.version}, ${g.result.lastSynchronizedVersion}, ${rr.result.role}, ${ra.result.action}
from ${GroupTable as g}
join ${GroupMemberFlatTable as f} on ${f.groupId} = ${g.id}
join ${PolicyTable as p} on ${g.id} = ${p.groupId}
Expand All @@ -1517,7 +1524,9 @@ class PostgresAccessPolicyDAO(
rs.get[ResourceId](r.resultName.name),
rs.get[ResourceTypeName](rt.resultName.name),
rs.get[WorkbenchEmail](g.resultName.email),
rs.boolean(p.resultName.public)
rs.boolean(p.resultName.public),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
),
(rs.stringOpt(rr.resultName.role).map(ResourceRoleName(_)), rs.stringOpt(ra.resultName.action).map(ResourceAction(_)))
)
Expand Down Expand Up @@ -2084,7 +2093,15 @@ class PostgresAccessPolicyDAO(
}
}

private final case class PolicyInfo(name: AccessPolicyName, resourceId: ResourceId, resourceTypeName: ResourceTypeName, email: WorkbenchEmail, public: Boolean)
private final case class PolicyInfo(
name: AccessPolicyName,
resourceId: ResourceId,
resourceTypeName: ResourceTypeName,
email: WorkbenchEmail,
public: Boolean,
version: Int,
lastSynchronizedVersion: Option[Int]
)
private final case class RoleResult(resourceTypeName: Option[ResourceTypeName], role: Option[ResourceRoleName], descendantsOnly: Option[Boolean])
private final case class ActionResult(resourceTypeName: Option[ResourceTypeName], action: Option[ResourceAction], descendantsOnly: Option[Boolean])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
val r = ResourceTable.syntax("r")
val rt = ResourceTypeTable.syntax("rt")

samsql"""select ${g.result.email}, ${gm.result.memberUserId}, ${sg.result.name}, ${p.result.name}, ${r.result.name}, ${rt.result.name}
samsql"""select ${g.result.email}, ${gm.result.memberUserId}, ${sg.result.name}, ${p.result.name}, ${r.result.name}, ${rt.result.name}, ${g.result.version}, ${g.result.lastSynchronizedVersion}
from ${GroupTable as g}
left join ${GroupMemberTable as gm} on ${g.id} = ${gm.groupId}
left join ${GroupTable as sg} on ${gm.memberGroupId} = ${sg.id}
Expand All @@ -100,7 +100,9 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
rs.stringOpt(sg.resultName.name).map(WorkbenchGroupName),
rs.stringOpt(p.resultName.name).map(AccessPolicyName(_)),
rs.stringOpt(r.resultName.name).map(ResourceId(_)),
rs.stringOpt(rt.resultName.name).map(ResourceTypeName(_))
rs.stringOpt(rt.resultName.name).map(ResourceTypeName(_)),
rs.get[Int](g.resultName.version),
rs.get[Option[Int]](g.resultName.lastSynchronizedVersion)
)
}
.list()
Expand All @@ -112,13 +114,16 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
} else {
val email = results.head._1
val members: Set[WorkbenchSubject] = results.collect {
case (_, Some(userId), None, None, None, None) => userId
case (_, None, Some(subGroupName), None, None, None) => subGroupName
case (_, None, Some(_), Some(policyName), Some(resourceName), Some(resourceTypeName)) =>
case (_, Some(userId), None, None, None, None, _, _) => userId
case (_, None, Some(subGroupName), None, None, None, _, _) => subGroupName
case (_, None, Some(_), Some(policyName), Some(resourceName), Some(resourceTypeName), _, _) =>
FullyQualifiedPolicyId(FullyQualifiedResourceId(resourceTypeName, resourceName), policyName)
}.toSet

Option(BasicWorkbenchGroup(groupName, members, email))
val version = results.head._7
val lastSynchronized = results.head._8

Option(BasicWorkbenchGroup(groupName, members, email, version, lastSynchronized))
}

override def loadGroupEmail(groupName: WorkbenchGroupName, samRequestContext: SamRequestContext): IO[Option[WorkbenchEmail]] =
Expand Down Expand Up @@ -154,7 +159,7 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
serializableWriteTransaction("addGroupMember", samRequestContext) { implicit session =>
val numberAdded = insertGroupMembers(queryForGroupPKs(Set(groupId)).head, Set(addMember))
if (numberAdded > 0) {
updateGroupUpdatedDate(groupId)
updateGroupUpdatedDateAndVersion(groupId)
true
} else {
false
Expand All @@ -169,21 +174,34 @@ class PostgresDirectoryDAO(protected val writeDbRef: DbReference, protected val
val removed = removeGroupMember(groupId, removeMember)

if (removed) {
updateGroupUpdatedDate(groupId)
updateGroupUpdatedDateAndVersion(groupId)
}

removed
}

override def updateGroupUpdatedDateAndVersionWithSession(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit] =
serializableWriteTransaction("updateGroupUpdatedDateAndVersionWithSession", samRequestContext) { implicit session =>
updateGroupUpdatedDateAndVersion(groupId)
}

override def isGroupMember(groupId: WorkbenchGroupIdentity, member: WorkbenchSubject, samRequestContext: SamRequestContext): IO[Boolean] =
readOnlyTransaction("isGroupMember", samRequestContext) { implicit session =>
isGroupMember(groupId, member)
}

override def updateSynchronizedDate(groupId: WorkbenchGroupIdentity, samRequestContext: SamRequestContext): IO[Unit] =
serializableWriteTransaction("updateSynchronizedDate", samRequestContext) { implicit session =>
/*
Update last synchronized version only when it is less than the current group version. This is to avoid
threads stepping over each other and causing sam to become out of sync with google. The last synchronized version
should only be set to the version of the group that is help in memory from when the sync started.
*/
override def updateSynchronizedDateAndVersion(group: WorkbenchGroup, samRequestContext: SamRequestContext): IO[Unit] =
serializableWriteTransaction("updateSynchronizedDateAndVersion", samRequestContext) { implicit session =>
val g = GroupTable.column
samsql"update ${GroupTable.table} set ${g.synchronizedDate} = ${Instant.now()} where ${g.id} = (${workbenchGroupIdentityToGroupPK(groupId)})"
samsql"""update ${GroupTable.table}
set ${g.synchronizedDate} = ${Instant.now()},
${g.lastSynchronizedVersion} = ${group.version}
where ${g.id} = (${workbenchGroupIdentityToGroupPK(group.id)}) and COALESCE(${g.lastSynchronizedVersion}, 0) < ${group.version}"""
.update()
.apply()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,9 +250,12 @@ trait PostgresGroupDAO {
query.map(rs => rs.int(1)).single().apply().getOrElse(0) > 0
}

def updateGroupUpdatedDate(groupId: WorkbenchGroupIdentity)(implicit session: DBSession): Int = {
def updateGroupUpdatedDateAndVersion(groupId: WorkbenchGroupIdentity)(implicit session: DBSession): Int = {
val g = GroupTable.column
samsql"update ${GroupTable.table} set ${g.updatedDate} = ${Instant.now()} where ${g.id} = (${workbenchGroupIdentityToGroupPK(groupId)})".update().apply()
samsql"""update ${GroupTable.table}
set ${g.updatedDate} = ${Instant.now()},
${g.version} = ${g.version} + 1
where ${g.id} = (${workbenchGroupIdentityToGroupPK(groupId)})""".update().apply()
}

def deleteGroup(groupName: WorkbenchGroupName)(implicit session: DBSession): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,15 @@ import scalikejdbc._
import java.time.Instant

final case class GroupPK(value: Long) extends DatabaseKey
final case class GroupRecord(id: GroupPK, name: WorkbenchGroupName, email: WorkbenchEmail, updatedDate: Option[Instant], synchronizedDate: Option[Instant])
final case class GroupRecord(
id: GroupPK,
name: WorkbenchGroupName,
email: WorkbenchEmail,
version: Int,
lastSynchronizedVersion: Option[Int],
updatedDate: Option[Instant],
synchronizedDate: Option[Instant]
)

object GroupTable extends SQLSyntaxSupportWithDefaultSamDB[GroupRecord] {
override def tableName: String = "SAM_GROUP"
Expand All @@ -17,6 +25,8 @@ object GroupTable extends SQLSyntaxSupportWithDefaultSamDB[GroupRecord] {
rs.get(e.id),
rs.get(e.name),
rs.get(e.email),
rs.get(e.version),
rs.get(e.lastSynchronizedVersion),
rs.timestampOpt(e.updatedDate).map(_.toInstant),
rs.timestampOpt(e.synchronizedDate).map(_.toInstant)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ class GoogleGroupSyncMessageReceiver(groupSynchronizer: GoogleGroupSynchronizer)
logger.info(s"group to synchronize not found: ${groupNotFound.errorReport}")
consumer.ack()

case groupAlreadySynchronized: GroupAlreadySynchronized =>
// this can happen if a group is synchronized multiple times in quick succession
// acknowledge it so we don't have to handle it again
logger.info(s"group already previously synchronized: ${groupAlreadySynchronized.getMessage}")
consumer.ack()

case regrets: Throwable =>
logger.error("failure synchronizing google group", regrets)
consumer.nack() // redeliver message to hopefully rectify the failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ import org.broadinstitute.dsde.workbench.sam.util.OpenTelemetryIOUtils._
import org.broadinstitute.dsde.workbench.sam.util.SamRequestContext
import org.broadinstitute.dsde.workbench.util.FutureSupport

class GroupAlreadySynchronized(errorReport: ErrorReport = ErrorReport(StatusCodes.Conflict, "Group has already been synchronized"))
extends WorkbenchExceptionWithErrorReport(errorReport)

/** This class makes sure that our google groups have the right members.
*
* For the simple case it merely compares group membership given by directoryDAO against group membership given by googleDirectoryDAO and does the appropriate
Expand Down Expand Up @@ -51,7 +54,15 @@ class GoogleGroupSynchronizer(
IO.pure(Map.empty)
} else {
for {
group <- loadSamGroup(groupId, samRequestContext)
group: WorkbenchGroup <- loadSamGroup(groupId, samRequestContext)
// If group.version > group.lastSynchronizedVersion, then the group needs to be synchronized
// Else Noop
_ <-
if (group.version > group.lastSynchronizedVersion.getOrElse(0)) {
IO.unit
} else {
IO.raiseError(new GroupAlreadySynchronized)
}
members <- calculateAuthDomainIntersectionIfRequired(group, samRequestContext)
subGroupSyncs <- syncSubGroupsIfRequired(group, visitedGroups, samRequestContext)
googleMemberEmails <- loadGoogleGroupMemberEmailsMaybeCreateGroup(group, samRequestContext)
Expand All @@ -63,7 +74,7 @@ class GoogleGroupSynchronizer(
addedUserSyncReports <- toAdd.toList.traverse(addMemberToGoogleGroup(group, samRequestContext))
removedUserSyncReports <- toRemove.toList.traverse(removeMemberFromGoogleGroup(group, samRequestContext))

_ <- directoryDAO.updateSynchronizedDate(groupId, samRequestContext)
_ <- directoryDAO.updateSynchronizedDateAndVersion(group, samRequestContext)
} yield Map(group.email -> Seq(addedUserSyncReports, removedUserSyncReports).flatten) ++ subGroupSyncs.flatten
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ consistent "has a" relationship is tracked by this ticket: https://broadworkbenc
roles: Set[ResourceRoleName],
actions: Set[ResourceAction],
descendantPermissions: Set[AccessPolicyDescendantPermissions],
public: Boolean
public: Boolean,
version: Int = 1,
lastSynchronizedVersion: Option[Int] = None
) extends WorkbenchGroup

@Lenses final case class AccessPolicyDescendantPermissions(resourceType: ResourceTypeName, actions: Set[ResourceAction], roles: Set[ResourceRoleName])
Expand All @@ -233,10 +235,18 @@ consistent "has a" relationship is tracked by this ticket: https://broadworkbenc
email: WorkbenchEmail,
roles: Set[ResourceRoleName],
actions: Set[ResourceAction],
public: Boolean
public: Boolean,
version: Int = 1,
lastSynchronizedVersion: Option[Int] = None
)

@Lenses final case class BasicWorkbenchGroup(id: WorkbenchGroupName, members: Set[WorkbenchSubject], email: WorkbenchEmail) extends WorkbenchGroup
@Lenses final case class BasicWorkbenchGroup(
id: WorkbenchGroupName,
members: Set[WorkbenchSubject],
email: WorkbenchEmail,
version: Int = 1,
lastSynchronizedVersion: Option[Int] = None
) extends WorkbenchGroup
object BasicWorkbenchGroup {
def apply(workbenchGroup: WorkbenchGroup): BasicWorkbenchGroup =
workbenchGroup.id match {
Expand Down
Loading

0 comments on commit 9e8a54c

Please sign in to comment.