Skip to content

Commit

Permalink
Merge pull request #2062 from scireum/feature/fha/SE-14230-Event-Stre…
Browse files Browse the repository at this point in the history
…aming-Hotfix

SE-14230: Cherry Picks Event Streaming for Hotfix
  • Loading branch information
fhaScireum authored Dec 12, 2024
2 parents 2883f89 + 9a33ec9 commit 9a36a28
Show file tree
Hide file tree
Showing 7 changed files with 249 additions and 8 deletions.
40 changes: 40 additions & 0 deletions src/main/java/sirius/biz/analytics/events/EventRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,15 @@
import java.time.LocalTime;
import java.time.YearMonth;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

/**
* Responsible for collecting and storing {@link Event events} for analytical and statistical purposes.
Expand Down Expand Up @@ -539,4 +543,40 @@ protected Event<?> fetchBufferedEvent() {

return result;
}

/// Fetches all user events which match the given query assuming that users can only trigger one event (of the type
/// in question) at the same time.
///
/// @param query the query to execute
/// @param <E> the type of the events to fetch
/// @return a stream of events which match the given query
public <E extends Event<E> & UserEvent> Stream<E> fetchUserEventsBlockwise(SmartQuery<E> query) {
return fetchEventsBlockwise(query,
List.of(UserEvent.USER_DATA.inner(UserData.SCOPE_ID),
UserEvent.USER_DATA.inner(UserData.TENANT_ID),
UserEvent.USER_DATA.inner(UserData.USER_ID)));
}

/// Fetches all events which match the given query considering the given duplicate preventer.
///
/// @param query the query to execute
/// @param duplicatePreventer the duplicate preventer to apply to prevent fetching the same events multiple times
/// @param <E> the type of the events to fetch
/// @return a stream of events which match the given query
/// @see EventSpliterator for a detailed explanation of the duplicate preventer
public <E extends Event<E>> Stream<E> fetchEventsBlockwise(SmartQuery<E> query,
BiConsumer<SmartQuery<E>, List<E>> duplicatePreventer) {
return StreamSupport.stream(new EventSpliterator<>(query, duplicatePreventer), false);
}

/// Fetches all events which match the given query considering the given distinct fields to prevent duplicates.
///
/// @param query the query to execute
/// @param distinctFields the fields to consider when preventing duplicates
/// @param <E> the type of the events to fetch
/// @return a stream of events which match the given query
/// @see EventSpliterator#EventSpliterator(SmartQuery, List) for a detailed explanation of the distinct fields
public <E extends Event<E>> Stream<E> fetchEventsBlockwise(SmartQuery<E> query, List<Mapping> distinctFields) {
return StreamSupport.stream(new EventSpliterator<E>(query, distinctFields), false);
}
}
169 changes: 169 additions & 0 deletions src/main/java/sirius/biz/analytics/events/EventSpliterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - [email protected]
*/

package sirius.biz.analytics.events;

import sirius.db.jdbc.OMA;
import sirius.db.jdbc.SmartQuery;
import sirius.db.jdbc.constraints.SQLConstraint;
import sirius.db.mixing.Mapping;
import sirius.kernel.async.TaskContext;
import sirius.kernel.commons.PullBasedSpliterator;

import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.function.BiConsumer;

/// Provides a spliterator which fetches events blockwise.
///
/// The spliterator is pull-based and fetches the next block of events when the current block is exhausted. Because
/// events have no natural order or distinct fields by default and multiple events of the same type can be recorded at
/// the same time, the next block of events is fetched starting with the timestamp of the last event of the last block.
/// To prevent fetching the same events multiple times, a "duplicate preventer" is used to add additional constraints to
/// the query to prevent fetching the same events multiple times.
///
/// The duplicate preventer is a consumer which is supplied with the query and the last events of the previous block.
/// The given query must be modified by the preventer to prevent fetching the same events again based on fields
/// that are unique to the event type, e.g. [UserData#USER_ID] in case of [user events][UserEvent].
///
/// Note, that only the last events of the previous block (meaning every event that shares the same timestamp as the
/// last event of the block) are supplied to the preventer as all preceding events will not be fetched again due to the
/// next fetch starting with the timestamp of the last event of the block.
///
/// Implementations of the duplicate preventer should in most cases add an 'AND NOT (eventTimestamp=X AND (A=Y OR A=Z
/// OR ...)' constraint to the query based on the supplied events. Here, X is the timestamp of the last event in the
/// preceding block and A is a distinct field where Y and Z are the corresponding values based on the supplied events.
/// The combination of the fields and values should take care of not fetching the same events multiple times. See
/// [EventRecorder#fetchUserEventsBlockwise(SmartQuery)] for an example where the fields [Event#EVENT_TIMESTAMP] and
/// user event specific fields [UserData#SCOPE_ID], [UserData#TENANT_ID] and [UserData#USER_ID] are used to prevent
/// fetching the same event triggered by the same user at the same time again.
///
/// Event deduplication may result in complex queries, which can potentially slow down performance or generate queries
/// that are too large to process. Therefore, if individual events are not needed, using
/// [metrics][sirius.kernel.health.metrics.Metric] may be a better choice for fetching aggregated data.
///
/// @param <E> the type of the events to fetch
public class EventSpliterator<E extends Event<E>> extends PullBasedSpliterator<E> {

private static final int BLOCK_SIZE = 1000;

private final SmartQuery<E> query;

/// Contains the last events fetched.
private final List<E> lastEvents = new ArrayList<>();

/// Contains a consumer which is used to prevent fetching the same events multiple times.
private final BiConsumer<SmartQuery<E>, List<E>> duplicatePreventer;

/// Creates a new spliterator for the given query and duplicate preventer.
///
/// The given query will be copied to allow re-use by the caller. Additionally, the given query does not need to
/// provide ordering or limits as this is handled by the spliterator itself.
///
/// The given duplicate preventer must add additional constraints to the query to prevent fetching the same events
/// multiple times.
///
/// @param query the query to use to fetch the events
/// @param duplicatePreventer a consumer which is used to prevent fetching the same events multiple times
/// @see EventSpliterator the class description for more information
public EventSpliterator(SmartQuery<E> query, BiConsumer<SmartQuery<E>, List<E>> duplicatePreventer) {
super();
this.query = query.copy().orderAsc(Event.EVENT_TIMESTAMP).limit(BLOCK_SIZE);
this.duplicatePreventer = duplicatePreventer;
}

/// Creates a new spliterator for the given query and considers the given fields as distinct fields to prevent
/// duplicates.
///
/// The given query will be copied to allow re-use by the caller. Additionally, the given query does not need to
/// provide ordering or limits as this is handled by the spliterator itself.
///
/// The given fields will be used to ignore events where the timestamp plus all distinct fields match those of one
/// of the previously fetched events.
///
/// The duplicate preventing portion of the SQL query will have the form:
/// ```sql
/// AND NOT (timestamp = last_timestamp -- Only constraint events with the same timestamp
/// AND ((field1 = event_1_field_1 AND field2 = event_1_field_2 AND ...) -- Ignore already fetched event 1
/// OR (field1 = event_2_field_1 AND field2 = event_2_field_2 AND ...) -- Ignore already fetched event 2
/// OR ...))
///```
///
/// @param query the query to use to fetch the events
/// @param distinctFields the fields to consider when preventing duplicates
/// @see EventSpliterator the class description for more information
public EventSpliterator(SmartQuery<E> query, List<Mapping> distinctFields) {
this(query, (effectiveQuery, events) -> {
effectiveQuery.where(createDuplicatePreventerConstraint(events, distinctFields));
});
}

private static <E extends Event<E>> SQLConstraint createDuplicatePreventerConstraint(List<E> events,
List<Mapping> distinctFields) {
return OMA.FILTERS.not(OMA.FILTERS.and(OMA.FILTERS.eq(Event.EVENT_TIMESTAMP,
events.getLast().getEventTimestamp()),
createEventsConstraint(events, distinctFields)));
}

private static <E extends Event<E>> SQLConstraint createEventsConstraint(List<E> events,
List<Mapping> distinctFields) {
return OMA.FILTERS.or(events.stream().map(event -> createFieldsConstraint(event, distinctFields)).toList());
}

private static <E extends Event<E>> SQLConstraint createFieldsConstraint(E event, List<Mapping> distinctFields) {
return OMA.FILTERS.and(distinctFields.stream().map(field -> {
return OMA.FILTERS.eq(field, event.getDescriptor().findProperty(field.getName()).getValue(event));
}).toList());
}

@Nullable
@Override
protected Iterator<E> pullNextBlock() {
if (!TaskContext.get().isActive()) {
return null;
}

List<E> events = resolveEffectiveQuery().queryList();
if (events.isEmpty()) {
return null;
}

if (!lastEvents.isEmpty() && !lastEvents.getLast()
.getEventTimestamp()
.equals(events.getLast().getEventTimestamp())) {
lastEvents.clear();
}

lastEvents.addAll(events);

return events.iterator();
}

@Override
public int characteristics() {
return NONNULL | IMMUTABLE | ORDERED;
}

private SmartQuery<E> resolveEffectiveQuery() {
SmartQuery<E> effectiveQuery = query.copy();

if (!lastEvents.isEmpty()) {
effectiveQuery.where(OMA.FILTERS.gte(Event.EVENT_TIMESTAMP, lastEvents.getLast().getEventTimestamp()));
duplicatePreventer.accept(effectiveQuery,
lastEvents.reversed()
.stream()
.filter(event -> event.getEventTimestamp()
.equals(lastEvents.getLast().getEventTimestamp()))
.toList());
}

return effectiveQuery;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* @see EventRecorder
* @see #withAggregationUrl(String)
*/
public class PageImpressionEvent extends Event<PageImpressionEvent> {
public class PageImpressionEvent extends Event<PageImpressionEvent> implements UserEvent {

/**
* Contains a generic or shortened URI which can be used to aggregate on.
Expand Down Expand Up @@ -82,7 +82,6 @@ public class PageImpressionEvent extends Event<PageImpressionEvent> {
/**
* Contains the current user, tenant and scope if available.
*/
public static final Mapping USER_DATA = Mapping.named("userData");
private final UserData userData = new UserData();

/**
Expand Down Expand Up @@ -168,6 +167,7 @@ public String getAggregationUri() {
return aggregationUri;
}

@Override
public UserData getUserData() {
return userData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@
*
* @see sirius.biz.tenants.TenantUserManager#recordUserActivityEvent(UserInfo)
*/
public class UserActivityEvent extends Event<UserActivityEvent> {
public class UserActivityEvent extends Event<UserActivityEvent> implements UserEvent {

/**
* Contains the current user, tenant and scope if available.
*/
public static final Mapping USER_DATA = Mapping.named("userData");
private final UserData userData = new UserData();

/**
Expand All @@ -38,6 +37,7 @@ public class UserActivityEvent extends Event<UserActivityEvent> {
public static final Mapping WEB_DATA = Mapping.named("webData");
private final WebData webData = new WebData();

@Override
public UserData getUserData() {
return userData;
}
Expand Down
29 changes: 29 additions & 0 deletions src/main/java/sirius/biz/analytics/events/UserEvent.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Made with all the love in the world
* by scireum in Remshalden, Germany
*
* Copyright by scireum GmbH
* http://www.scireum.de - [email protected]
*/

package sirius.biz.analytics.events;

import sirius.db.mixing.Mapping;

/**
* May be implemented by an {@link Event} to provide access to the {@link UserData} of the current user.
*/
public interface UserEvent {

/**
* Contains the user data of the event.
*/
Mapping USER_DATA = Mapping.named("userData");

/**
* Returns the user data of the event.
*
* @return the user data of the event
*/
UserData getUserData();
}
5 changes: 3 additions & 2 deletions src/main/java/sirius/biz/tycho/updates/UpdateClickEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,22 @@

import sirius.biz.analytics.events.Event;
import sirius.biz.analytics.events.UserData;
import sirius.biz.analytics.events.UserEvent;
import sirius.db.mixing.Mapping;

/**
* Records a click on an {@link UpdateInfo}.
*
* @see UpdateManager
*/
public class UpdateClickEvent extends Event<UpdateClickEvent> {
public class UpdateClickEvent extends Event<UpdateClickEvent> implements UserEvent {

public static final Mapping UPDATE_GUID = Mapping.named("updateGuid");
private String updateGuid;

/**
* Contains the current user, tenant and scope if available.
*/
public static final Mapping USER_DATA = Mapping.named("userData");
private final UserData userData = new UserData();

/**
Expand All @@ -43,6 +43,7 @@ public String getUpdateGuid() {
return updateGuid;
}

@Override
public UserData getUserData() {
return userData;
}
Expand Down
6 changes: 4 additions & 2 deletions src/main/java/sirius/biz/util/SOAPCallEvent.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,19 @@

import sirius.biz.analytics.events.Event;
import sirius.biz.analytics.events.UserData;
import sirius.biz.analytics.events.UserEvent;
import sirius.db.mixing.Mapping;
import sirius.db.mixing.annotations.NullAllowed;
import sirius.db.mixing.annotations.Trim;

/**
* Record a SOAP call performed by {@link sirius.biz.util.MonitoredSOAPClient}.
*/
public class SOAPCallEvent extends Event<SOAPCallEvent> {
public class SOAPCallEvent extends Event<SOAPCallEvent> implements UserEvent {

/**
* Contains the shop, customer and user which triggered the event.
*/
public static final Mapping USER_DATA = Mapping.named("userData");
private final UserData userData = new UserData();

/**
Expand Down Expand Up @@ -155,6 +156,7 @@ public boolean isErroneous() {
return erroneous;
}

@Override
public UserData getUserData() {
return userData;
}
Expand Down

0 comments on commit 9a36a28

Please sign in to comment.