Skip to content

Commit

Permalink
Merge pull request #369 from CJSCommonPlatform/fix-snapshot-regenerat…
Browse files Browse the repository at this point in the history
…ion-bean

Fix error in RegenerateAggregateSnapshotBean where a closed java Stream is reused
  • Loading branch information
allanmckenzie authored Dec 14, 2024
2 parents 6a90bfe + 6b9d71b commit 3c21a78
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 7 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
[Semantic Versioning](http://semver.org/).

### [Unreleased]

## [17.100.8] - 2024-12-14
### Fixed
- Fixed error in RegenerateAggregateSnapshotBean where a closed java Stream was reused

## [17.100.7] - 2024-11-27
### Changed
- Jmx MBean `SystemCommanderMBean` now only takes basic Java Objects to keep the JMX handling interoperable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
@TransactionManagement(TransactionManagementType.BEAN)
public class RegenerateAggregateSnapshotBean {

static final Stream<JsonEnvelope> EMPTY_JSON_ENVELOPE_STREAM_TO_FORCE_SNAPSHOT_GENERATION = empty();

@Inject
private SnapshotAwareAggregateService snapshotAwareAggregateService;

Expand Down Expand Up @@ -58,7 +56,8 @@ public void runAggregateSnapshotRegeneration(final UUID streamId, final String a
final EventStream eventStream = eventSource.getStreamById(streamId);
final Aggregate aggregate = snapshotAwareAggregateService.get(eventStream, aggregateClass);
// to save, let's append
eventStream.append(EMPTY_JSON_ENVELOPE_STREAM_TO_FORCE_SNAPSHOT_GENERATION);
final Stream<JsonEnvelope> empty = empty();
eventStream.append(empty);

logger.info(format("'%s' hydrated with all events for streamId '%s'", aggregate.getClass().getName(), streamId));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.when;
import static uk.gov.justice.services.eventstore.management.aggregate.snapshot.regeneration.commands.RegenerateAggregateSnapshotBean.EMPTY_JSON_ENVELOPE_STREAM_TO_FORCE_SNAPSHOT_GENERATION;

import uk.gov.justice.domain.aggregate.Aggregate;
import uk.gov.justice.services.core.aggregate.SnapshotAwareAggregateService;
Expand All @@ -18,11 +18,13 @@
import uk.gov.justice.services.eventsourcing.source.core.exception.EventStreamException;

import java.util.UUID;
import java.util.stream.Stream;

import javax.transaction.UserTransaction;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
import org.mockito.InjectMocks;
import org.mockito.Mock;
Expand Down Expand Up @@ -73,14 +75,18 @@ public void shouldGetAggregateByNameAndStreamIdInOrderToGenerateSnapshots() thro

regenerateAggregateSnapshotBean.runAggregateSnapshotRegeneration(streamId, aggregateClassName);

final ArgumentCaptor<Stream> streamArgumentCaptor = ArgumentCaptor.forClass(Stream.class);

final InOrder inOrder = inOrder(userTransaction, logger, eventStream, eventSource);
inOrder.verify(userTransaction).setTransactionTimeout(transactionTimoutSeconds);
inOrder.verify(userTransaction).begin();
inOrder.verify(logger).info("Hydrating aggregate 'uk.gov.justice.services.eventstore.management.aggregate.snapshot.regeneration.commands.SomeAggregate' for streamId '6c01bc31-9ee2-4a3c-bdd2-6f20a619bae8'");
inOrder.verify(eventSource).getStreamById(streamId);
inOrder.verify(eventStream).append(EMPTY_JSON_ENVELOPE_STREAM_TO_FORCE_SNAPSHOT_GENERATION);
inOrder.verify(eventStream).append(streamArgumentCaptor.capture());
inOrder.verify(logger).info("'uk.gov.justice.services.eventstore.management.aggregate.snapshot.regeneration.commands.SomeAggregate' hydrated with all events for streamId '6c01bc31-9ee2-4a3c-bdd2-6f20a619bae8'");
inOrder.verify(userTransaction).commit();

assertThat(streamArgumentCaptor.getValue().toList().isEmpty(), is(true));
}

@SuppressWarnings({"rawtypes", "unchecked"})
Expand All @@ -101,7 +107,7 @@ public void shouldThrowAggregateSnapshotGenerationFailedExceptionIfSnapshotGener
when(aggregateClassProvider.toClass(aggregateClassName)).thenReturn(aggregateClass);
when(eventSource.getStreamById(streamId)).thenReturn(eventStream);
when(snapshotAwareAggregateService.get(eventStream, aggregateClass)).thenReturn(aggregate);
doThrow(eventStreamException).when(eventStream).append(EMPTY_JSON_ENVELOPE_STREAM_TO_FORCE_SNAPSHOT_GENERATION);
doThrow(eventStreamException).when(eventStream).append(any(Stream.class));

final AggregateSnapshotGenerationFailedException aggregateSnapshotGenerationFailedException = assertThrows(
AggregateSnapshotGenerationFailedException.class,
Expand All @@ -112,14 +118,18 @@ public void shouldThrowAggregateSnapshotGenerationFailedExceptionIfSnapshotGener
assertThat(aggregateSnapshotGenerationFailedException.getMessage(), is("Snapshot generation failed for 'uk.gov.justice.services.eventstore.management.aggregate.snapshot.regeneration.commands.SomeAggregate': streamId '6c01bc31-9ee2-4a3c-bdd2-6f20a619bae8'"));
assertThat(aggregateSnapshotGenerationFailedException.getCause(), is(eventStreamException));

final ArgumentCaptor<Stream> streamArgumentCaptor = ArgumentCaptor.forClass(Stream.class);

final InOrder inOrder = inOrder(userTransaction, logger, eventStream, eventSource);
inOrder.verify(userTransaction).setTransactionTimeout(transactionTimoutSeconds);
inOrder.verify(userTransaction).begin();
inOrder.verify(logger).info("Hydrating aggregate 'uk.gov.justice.services.eventstore.management.aggregate.snapshot.regeneration.commands.SomeAggregate' for streamId '6c01bc31-9ee2-4a3c-bdd2-6f20a619bae8'");
inOrder.verify(eventSource).getStreamById(streamId);
inOrder.verify(eventStream).append(EMPTY_JSON_ENVELOPE_STREAM_TO_FORCE_SNAPSHOT_GENERATION);
inOrder.verify(eventStream).append(streamArgumentCaptor.capture());
inOrder.verify(userTransaction).rollback();

inOrder.verify(userTransaction, never()).commit();

assertThat(streamArgumentCaptor.getValue().toList().isEmpty(), is(true));
}
}

0 comments on commit 3c21a78

Please sign in to comment.