Skip to content

Commit

Permalink
CLIENT-3168: Support Touched method (#139)
Browse files Browse the repository at this point in the history
  • Loading branch information
shannonklaus authored Dec 5, 2024
1 parent bff1a09 commit 8e202be
Show file tree
Hide file tree
Showing 8 changed files with 280 additions and 16 deletions.
48 changes: 44 additions & 4 deletions AerospikeClient/Async/AsyncClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2023 Aerospike, Inc.
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -453,15 +453,15 @@ public void Delete(BatchPolicy batchPolicy, BatchDeletePolicy deletePolicy, Batc

new AsyncBatchOperateRecordSequenceExecutor(cluster, batchPolicy, listener, keys, null, attr);
}

//-------------------------------------------------------
// Touch Operations
//-------------------------------------------------------

/// <summary>
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Create listener, call asynchronous touch and return task monitor.
/// Fail if the record does not exist.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="token">cancellation token</param>
Expand All @@ -478,7 +478,7 @@ public Task Touch(WritePolicy policy, CancellationToken token, Key key)
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Schedule the touch command with a channel selector and return.
/// Another thread will process the command and send the results to the listener.
/// Fail if the record does not exist.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="listener">where to send results, pass in null for fire and forget</param>
Expand All @@ -494,6 +494,46 @@ public void Touch(WritePolicy policy, WriteListener listener, Key key)
async.Execute();
}

/// <summary>
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Create listener, call asynchronous touched and return task monitor.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="token">cancellation token</param>
/// <param name="key">unique record identifier</param>
/// <exception cref="AerospikeException">if queue is full</exception>
public Task<bool> Touched(WritePolicy policy, CancellationToken token, Key key)
{
ExistsListenerAdapter listener = new(token);
Touched(policy, listener, key);
return listener.Task;
}

/// <summary>
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Schedule the touched command with a channel selector and return.
/// Another thread will process the command and send the results to the listener.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// <para>
/// If the record does not exist, send a value of false to
/// <see cref="ExistsListener.OnSuccess(Key, bool)"/>
/// </para>
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="listener">where to send results, pass in null for fire and forget</param>
/// <param name="key">unique record identifier</param>
/// <exception cref="AerospikeException">if queue is full</exception>
public void Touched(WritePolicy policy, ExistsListener listener, Key key)
{
if (policy == null)
{
policy = writePolicyDefault;
}
AsyncTouch async = new(cluster, policy, listener, key);
async.Execute();
}

//-------------------------------------------------------
// Existence-Check Operations
//-------------------------------------------------------
Expand Down
34 changes: 34 additions & 0 deletions AerospikeClient/Async/AsyncTouch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,28 @@ public sealed class AsyncTouch : AsyncSingleCommand
{
private readonly WritePolicy writePolicy;
private readonly WriteListener listener;
private readonly ExistsListener existsListener;
private readonly Key key;
private readonly Partition partition;
private bool touched;

public AsyncTouch(AsyncCluster cluster, WritePolicy writePolicy, WriteListener listener, Key key)
: base(cluster, writePolicy)
{
this.writePolicy = writePolicy;
this.listener = listener;
this.existsListener = null;
this.key = key;
this.partition = Partition.Write(cluster, policy, key);
cluster.AddTran();
}

public AsyncTouch(AsyncCluster cluster, WritePolicy writePolicy, ExistsListener listener, Key key)
: base(cluster, writePolicy)
{
this.writePolicy = writePolicy;
this.listener = null;
this.existsListener = listener;
this.key = key;
this.partition = Partition.Write(cluster, policy, key);
cluster.AddTran();
Expand All @@ -39,6 +53,7 @@ public AsyncTouch(AsyncTouch other)
{
this.writePolicy = other.writePolicy;
this.listener = other.listener;
this.existsListener = other.existsListener;
this.key = other.key;
this.partition = other.partition;
}
Expand Down Expand Up @@ -74,6 +89,17 @@ protected internal override void ParseResult()

if (resultCode == 0)
{
touched = true;
return;
}

touched = false;
if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR)
{
if (existsListener == null)
{
throw new AerospikeException(resultCode);
}
return;
}

Expand Down Expand Up @@ -101,6 +127,10 @@ protected internal override void OnSuccess()
{
listener.OnSuccess(key);
}
else if (existsListener != null)
{
existsListener.OnSuccess(key, touched);
}
}

protected internal override void OnFailure(AerospikeException e)
Expand All @@ -109,6 +139,10 @@ protected internal override void OnFailure(AerospikeException e)
{
listener.OnFailure(e);
}
else if (existsListener != null)
{
existsListener.OnFailure(e);
}
}
}
}
33 changes: 30 additions & 3 deletions AerospikeClient/Async/IAsyncClient.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2012-2022 Aerospike, Inc.
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
Expand Down Expand Up @@ -265,7 +265,7 @@ public interface IAsyncClient : IAerospikeClient
/// <summary>
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Create listener, call asynchronous touch and return task monitor.
/// Fail if the record does not exist.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="token">cancellation token</param>
Expand All @@ -277,14 +277,41 @@ public interface IAsyncClient : IAerospikeClient
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Schedule the touch command with a channel selector and return.
/// Another thread will process the command and send the results to the listener.
/// Fail if the record does not exist.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="listener">where to send results, pass in null for fire and forget</param>
/// <param name="key">unique record identifier</param>
/// <exception cref="AerospikeException">if queue is full</exception>
void Touch(WritePolicy policy, WriteListener listener, Key key);

/// <summary>
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Create listener, call asynchronous touched and return task monitor.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="token">cancellation token</param>
/// <param name="key">unique record identifier</param>
/// <exception cref="AerospikeException">if queue is full</exception>
Task<bool> Touched(WritePolicy policy, CancellationToken token, Key key);

/// <summary>
/// Asynchronously reset record's time to expiration using the policy's expiration.
/// Schedule the touched command with a channel selector and return.
/// Another thread will process the command and send the results to the listener.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// <para>
/// If the record does not exist, send a value of false to
/// <see cref="ExistsListener.OnSuccess(Key, bool)"/>
/// </para>
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="listener">where to send results, pass in null for fire and forget</param>
/// <param name="key">unique record identifier</param>
/// <exception cref="AerospikeException">if queue is full</exception>
void Touched(WritePolicy policy, ExistsListener listener, Key key);

//-------------------------------------------------------
// Existence-Check Operations
//-------------------------------------------------------
Expand Down
24 changes: 24 additions & 0 deletions AerospikeClient/Command/TouchCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,26 @@ public sealed class TouchCommand : SyncCommand
private readonly WritePolicy writePolicy;
private readonly Key key;
private readonly Partition partition;
private readonly bool failOnNotFound;
internal bool Touched { get; private set; }

public TouchCommand(Cluster cluster, WritePolicy writePolicy, Key key)
: base(cluster, writePolicy)
{
this.writePolicy = writePolicy;
this.key = key;
this.partition = Partition.Write(cluster, writePolicy, key);
this.failOnNotFound = true;
cluster.AddTran();
}

public TouchCommand(Cluster cluster, WritePolicy writePolicy, Key key, bool failOnNotFound)
: base(cluster, writePolicy)
{
this.writePolicy = writePolicy;
this.key = key;
this.partition = Partition.Write(cluster, writePolicy, key);
this.failOnNotFound = failOnNotFound;
cluster.AddTran();
}

Expand Down Expand Up @@ -58,6 +71,17 @@ protected internal override void ParseResult(Connection conn)

if (resultCode == 0)
{
Touched = true;
return;
}

Touched = false;
if (resultCode == ResultCode.KEY_NOT_FOUND_ERROR)
{
if (failOnNotFound)
{
throw new AerospikeException(resultCode);
}
return;
}

Expand Down
24 changes: 23 additions & 1 deletion AerospikeClient/Main/AerospikeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,8 @@ public void Truncate(InfoPolicy policy, string ns, string set, DateTime? beforeL

/// <summary>
/// Reset record's time to expiration using the policy's expiration.
/// Fail if the record does not exist.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// Throw an exception if the record does not exist.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="key">unique record identifier</param>
Expand All @@ -676,6 +677,27 @@ public void Touch(WritePolicy policy, Key key)
command.Execute();
}

/// <summary>
/// Reset record's time to expiration using the policy's expiration.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// Return true if the record exists and is touched.Return false if the record does not exist.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="key">unique record identifier</param>
/// <returns>true if record was touched, false otherwise</returns>
/// <exception cref="AerospikeException">if touch fails</exception>
public bool Touched(WritePolicy policy, Key key)
{
if (policy == null)
{
policy = writePolicyDefault;
}
TouchCommand command = new(cluster, policy, key, false);
command.Execute();

return command.Touched;
}

//-------------------------------------------------------
// Existence-Check Operations
//-------------------------------------------------------
Expand Down
17 changes: 16 additions & 1 deletion AerospikeClient/Main/IAerospikeClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,30 @@ public interface IAerospikeClient
// Touch Operations
//-------------------------------------------------------


/// <summary>
/// Reset record's time to expiration using the policy's expiration.
/// Fail if the record does not exist.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// Throw an exception if the record does not exist.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="key">unique record identifier</param>
/// <exception cref="AerospikeException">if touch fails</exception>
void Touch(WritePolicy policy, Key key);



/// <summary>
/// Reset record's time to expiration using the policy's expiration.
/// If the record does not exist, it can't be created because the server deletes empty records.
/// Return true if the record exists and is touched.Return false if the record does not exist.
/// </summary>
/// <param name="policy">write configuration parameters, pass in null for defaults</param>
/// <param name="key">unique record identifier</param>
/// <returns>true if record was touched, false otherwise</returns>
/// <exception cref="AerospikeException">if touch fails</exception>
bool Touched(WritePolicy policy, Key key);

//-------------------------------------------------------
// Existence-Check Operations
//-------------------------------------------------------
Expand Down
56 changes: 56 additions & 0 deletions AerospikeTest/Async/TestAsyncTouch.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2012-2024 Aerospike, Inc.
*
* Portions may be licensed to Aerospike, Inc. under one or more contributor
* license agreements.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
using Aerospike.Client;
using Microsoft.VisualStudio.TestTools.UnitTesting;

namespace Aerospike.Test
{
[TestClass]
public class TestAsyncTouch : TestAsync
{
[TestMethod]
public void AsyncTouched()
{
Key key = new(args.ns, args.set, "doesNotExistAsyncTouch");

client.Touched(null, new TouchListener(this), key);
WaitTillComplete();
}

private class TouchListener : ExistsListener
{
private readonly TestAsyncTouch parent;

public TouchListener(TestAsyncTouch parent)
{
this.parent = parent;
}

public void OnSuccess(Key key, bool exists)
{
Assert.IsFalse(exists);
parent.NotifyCompleted();
}

public void OnFailure(AerospikeException e)
{
parent.SetError(e);
parent.NotifyCompleted();
}
}
}
}
Loading

0 comments on commit 8e202be

Please sign in to comment.