Skip to content

Commit

Permalink
Clarify DistributedUniqueTimeProvider.currentTimeMillis() isn't inten…
Browse files Browse the repository at this point in the history
…d to be unique and add tests #622 (#623)

* Clarify DistributedUniqueTimeProvider.currentTimeMillis() isn't intended to be unique and add tests #622

* Clarify DistributedUniqueTimeProvider.currentTimeMillis() isn't intended to be unique and add tests #622

* Clarify DistributedUniqueTimeProvider.currentTimeMillis() isn't intended to be unique and add tests #622

* Revert removal of deprecated method
  • Loading branch information
peter-lawrey authored Apr 15, 2024
1 parent 4eef297 commit 26d7fbc
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,16 @@
import java.io.File;

/**
* A TimeProvider implementation that ensures unique timestamps across multiple systems using a predefined hostId.
* It extends SimpleCloseable and implements TimeProvider to provide functionality for managing the timestamps.
* This class manages unique timestamp distribution across different hosts/systems.
* Provides unique timestamps across multiple systems by incorporating a host identifier into the timestamp, for microsecond and nanosecond resolution timestamps.
* This class is particularly useful in distributed systems where clock synchronization and uniqueness
* across hosts are critical. It implements {@link TimeProvider}.
* <p>
* NOTE: {@link #currentTimeMillis()} is not unique, it is just a call to the underlying provider as there isn't enough resolution to include the hostId.
* <p>
*
* Each timestamp generated is guaranteed to be unique across all hosts participating in the system.
* The class uses a file-based mechanism to ensure that timestamps are not only unique across restarts
* but also across different JVM instances.
*/
public class DistributedUniqueTimeProvider extends SimpleCloseable implements TimeProvider {

Expand All @@ -47,6 +54,13 @@ public class DistributedUniqueTimeProvider extends SimpleCloseable implements Ti
private TimeProvider provider = SystemTimeProvider.INSTANCE;
private int hostId;

/**
* Constructs a {@link DistributedUniqueTimeProvider} for a specified hostId.
* This constructor initializes a file-based backend which is used to store and deduplicate timestamps.
*
* @param hostId the identifier for the host, must be non-negative
* @param unmonitor if true, disables the monitoring of the file and byte stores used internally
*/
private DistributedUniqueTimeProvider(@NonNegative int hostId, boolean unmonitor) {
hostId(hostId);
try {
Expand All @@ -66,14 +80,22 @@ private DistributedUniqueTimeProvider(@NonNegative int hostId, boolean unmonitor
}

/**
* Returns an instance of DistributedUniqueTimeProvider.
* Provides a singleton instance of DistributedUniqueTimeProvider using the default hostId.
* This method is thread-safe and uses lazy initialization.
*
* @return the single instance of DistributedUniqueTimeProvider
* @return the single, shared instance of DistributedUniqueTimeProvider
*/
public static DistributedUniqueTimeProvider instance() {
return DistributedUniqueTimeProviderHolder.INSTANCE;
}

/**
* Creates a new instance of DistributedUniqueTimeProvider for a specified hostId.
* This is useful in scenarios where multiple instances are required, each with a different hostId.
*
* @param hostId the host identifier for which the time provider is to be created, must be non-negative
* @return a new instance of DistributedUniqueTimeProvider configured with the given hostId
*/
public static DistributedUniqueTimeProvider forHostId(@NonNegative int hostId) {
return new DistributedUniqueTimeProvider(hostId, false);
}
Expand Down Expand Up @@ -105,6 +127,7 @@ protected void performClose() {
bytes.release(this);
file.releaseLast();
}

/**
* Sets the hostId of the DistributedUniqueTimeProvider. The hostId is used to
* create unique timestamps across different hosts.
Expand All @@ -116,7 +139,7 @@ protected void performClose() {
public DistributedUniqueTimeProvider hostId(@NonNegative int hostId) {
// Check if the provided hostId is negative and throw an exception if it is
if (hostId < 0)
throw new IllegalArgumentException("Invalid hostId: " + hostId);
throw new IllegalArgumentException("Host ID must be non-negative but was: " + hostId);

// Assign the provided hostId modulo the maximum number of host IDs
// to ensure it's within the valid range
Expand All @@ -142,6 +165,11 @@ public DistributedUniqueTimeProvider provider(TimeProvider provider) {
}

/**
* NOTE: Calls to this method do not produce unique timestamps, rather just calls the underlying provider.
* <p>
* Use {@link #currentTimeMicros()} or {@link #currentTimeNanos()} to generate unique timestamps,
* or use {@link net.openhft.chronicle.core.time.UniqueMicroTimeProvider#currentTimeMillis()} to generate unique timestamps.
* <p>
* @return Ordinary millisecond timestamp
*/
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,38 @@

import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.OS;
import net.openhft.chronicle.core.time.LongTime;
import net.openhft.chronicle.core.time.TimeProvider;
import net.openhft.chronicle.core.time.*;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class DistributedUniqueTimeProviderTest extends BytesTestCommon {

private DistributedUniqueTimeProvider timeProvider;
private SetTimeProvider setTimeProvider;

@Before
public void setUp() {
timeProvider = DistributedUniqueTimeProvider.instance();
setTimeProvider = new SetTimeProvider(SystemTimeProvider.INSTANCE.currentTimeNanos());
timeProvider.provider(setTimeProvider);
}

static volatile long blackHole;

@BeforeClass
Expand All @@ -48,10 +65,9 @@ public static void checks() throws IOException {

@Test
public void currentTimeMicros() {
TimeProvider tp = DistributedUniqueTimeProvider.instance();
long last = 0;
for (int i = 0; i < 100_000; i++) {
long time = tp.currentTimeMicros();
long time = timeProvider.currentTimeMicros();
assertTrue(time > last);
assertEquals(LongTime.toMicros(time), time);
last = time;
Expand All @@ -60,12 +76,11 @@ public void currentTimeMicros() {

@Test
public void currentTimeMicrosPerf() {
TimeProvider tp = DistributedUniqueTimeProvider.instance();
long start = System.currentTimeMillis(), end;
int count = 0;
do {
for (int i = 0; i < 1000; i++)
blackHole = tp.currentTimeMicros();
blackHole = ((TimeProvider) timeProvider).currentTimeMicros();
count += 1000;
} while ((end = System.currentTimeMillis()) < start + 500);
long rate = 1000L * count / (end - start);
Expand All @@ -75,12 +90,11 @@ public void currentTimeMicrosPerf() {

@Test
public void currentTimeNanosPerf() {
TimeProvider tp = DistributedUniqueTimeProvider.instance();
long start = System.currentTimeMillis(), end;
int count = 0;
do {
for (int i = 0; i < 1000; i++)
blackHole = tp.currentTimeNanos();
blackHole = ((TimeProvider) timeProvider).currentTimeNanos();
count += 1000;
} while ((end = System.currentTimeMillis()) < start + 500);
long rate = 1000L * count / (end - start);
Expand All @@ -90,13 +104,12 @@ public void currentTimeNanosPerf() {

@Test
public void currentTimeNanos() {
TimeProvider tp = DistributedUniqueTimeProvider.instance();
long start = tp.currentTimeNanos();
long start = ((TimeProvider) timeProvider).currentTimeNanos();
long last = start;
int count = 0;
long runTime = Jvm.isArm() ? 3_000_000_000L : 500_000_000L;
for (; ; ) {
long now = tp.currentTimeNanos();
long now = ((TimeProvider) timeProvider).currentTimeNanos();
assertEquals(LongTime.toNanos(now), now);
if (now > start + runTime)
break;
Expand Down Expand Up @@ -140,12 +153,152 @@ public void concurrentTimeNanos() {

@Test
public void testMonotonicallyIncreasing() {
TimeProvider tp = DistributedUniqueTimeProvider.instance();
long last = 0;
for (int i = 0; i < 10_000; i++) {
long now = DistributedUniqueTimeProvider.timestampFor(tp.currentTimeNanos());
long now = DistributedUniqueTimeProvider.timestampFor(((TimeProvider) timeProvider).currentTimeNanos());
assertTrue(now > last);
last = now;
}
}

@Test
public void shouldProvideUniqueTimeAcrossThreadsMicros() throws InterruptedException {
final Set<Long> allGeneratedTimestamps = ConcurrentHashMap.newKeySet();
final int numberOfThreads = 50;
final int factor = 50;
final int iterationsPerThread = 500;
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final CountDownLatch latch = new CountDownLatch(numberOfThreads * factor);

for (int i = 0; i < numberOfThreads * factor; i++) {
executor.execute(() -> {
try {
List<Long> threadTimeSet = new ArrayList<>(iterationsPerThread);
long lastTimestamp = 0;
for (int j = 0; j < iterationsPerThread; j++) {

// there could be a race condition for the next two methods, but it shouldn't matter for this test
setTimeProvider.advanceNanos(j);
long currentTimeMicros = timeProvider.currentTimeMicros();

threadTimeSet.add(currentTimeMicros);
assertTrue("Timestamps should always increase", currentTimeMicros > lastTimestamp);
lastTimestamp = currentTimeMicros;
}
allGeneratedTimestamps.addAll(threadTimeSet);
} finally {
latch.countDown();
}
});
}

latch.await();
executor.shutdown();

assertEquals("All timestamps across all threads and iterations should be unique",
numberOfThreads * iterationsPerThread * factor, allGeneratedTimestamps.size());
}

@Test
public void shouldProvideUniqueTimeAcrossThreadsNanos() throws InterruptedException {
final Set<Long> allGeneratedTimestamps = ConcurrentHashMap.newKeySet();
final int numberOfThreads = 50;
final int factor = 50;
final int iterationsPerThread = 500;
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
final CountDownLatch latch = new CountDownLatch(numberOfThreads * factor);

for (int i = 0; i < numberOfThreads * factor; i++) {
executor.execute(() -> {
try {
List<Long> threadTimeSet = new ArrayList<>(iterationsPerThread);
long lastTimestamp = 0;
for (int j = 0; j < iterationsPerThread; j++) {

// there could be a race condition for the next two methods, but it shouldn't matter for this test
setTimeProvider.advanceNanos(j);
long currentTimeNanos = timeProvider.currentTimeNanos();

threadTimeSet.add(currentTimeNanos);
assertTrue("Timestamps should always be increasing", currentTimeNanos > lastTimestamp);
lastTimestamp = currentTimeNanos;
}
allGeneratedTimestamps.addAll(threadTimeSet);
} finally {
latch.countDown();
}
});
}

latch.await();
executor.shutdown();

assertEquals("All timestamps across all threads and iterations should be unique",
numberOfThreads * iterationsPerThread * factor, allGeneratedTimestamps.size());
}

@Test
public void shouldAdvanceTimeWhenExceedingCallsPerSecond() {
final int iterations = 1_000_001;
long lastTimeMicros = 0;

for (int i = 0; i < iterations; i++) {
setTimeProvider.advanceNanos(i);
long currentTimeMicros = timeProvider.currentTimeMicros();
assertTrue("Each timestamp must be greater than the last", currentTimeMicros > lastTimeMicros);
lastTimeMicros = currentTimeMicros;
}
}

@Test
public void currentTimeMillisShouldBeCorrect() {
int iterations = 1_000;
long lastTimeMillis = 0;
final long startTimeMillis = setTimeProvider.currentTimeMillis();

for (int i = 0; i < iterations; i++) {
setTimeProvider.advanceNanos(i);
long currentTimeMillis = timeProvider.currentTimeMillis();
assertTrue(currentTimeMillis >= startTimeMillis);
assertTrue(currentTimeMillis <= startTimeMillis + iterations);
assertTrue("Millisecond timestamps must increase or be the same", currentTimeMillis >= lastTimeMillis);
lastTimeMillis = currentTimeMillis;
}
}

@Test
public void currentTimeMicrosShouldBeCorrect() {
long lastTimeMicros = 0;

for (int i = 0; i < 4_000; i++) {
setTimeProvider.advanceNanos(i);
long currentTimeMicros = timeProvider.currentTimeMicros();
assertTrue("Microsecond timestamps must increase", currentTimeMicros > lastTimeMicros);
lastTimeMicros = currentTimeMicros;
}
}

@Test
public void currentTimeMicrosShouldBeCorrectBackwards() {
long lastTimeMicros = 0;

for (int i = 0; i < 4_000; i++) {
setTimeProvider.advanceNanos(-i);
long currentTimeMicros = timeProvider.currentTimeMicros();
assertTrue("Microsecond timestamps must increase", currentTimeMicros > lastTimeMicros);
lastTimeMicros = currentTimeMicros;
}
}

@Test
public void currentTimeNanosShouldBeCorrect() {
long lastTimeNanos = 0;

for (int i = 0; i < 4_000; i++) {
setTimeProvider.advanceNanos(i);
long currentTimeNanos = timeProvider.currentTimeNanos();
assertTrue("Nanosecond timestamps should increase", currentTimeNanos > lastTimeNanos);
lastTimeNanos = currentTimeNanos / 1000;
}
}
}

0 comments on commit 26d7fbc

Please sign in to comment.