Skip to content

Commit

Permalink
#743 improve saturnJobReturn for batch consume
Browse files Browse the repository at this point in the history
  • Loading branch information
kino.lu committed Apr 26, 2021
1 parent 7f1e95f commit 2440ac0
Show file tree
Hide file tree
Showing 3 changed files with 166 additions and 65 deletions.
9 changes: 9 additions & 0 deletions saturn-job-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,13 @@
</parent>
<artifactId>saturn-job-api</artifactId>
<name>${project.artifactId}</name>

<dependencies>
<!-- Test start -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
107 changes: 42 additions & 65 deletions saturn-job-api/src/main/java/com/vip/saturn/job/SaturnJobReturn.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class SaturnJobReturn implements Serializable {
/**
* 返回的属性,消息服务的作业会将该属性设置到发送的Channel中
*/
private Map<String, String> prop;
private Map<String, String> prop = new ConcurrentHashMap<>();

public static SaturnJobReturnBuilder builder() {
return new SaturnJobReturnBuilder();
Expand Down Expand Up @@ -168,16 +168,17 @@ public Map<String, String> getProp() {
}

public void setProp(Map<String, String> prop) {
this.prop = prop;
if (prop == null) {
this.prop = new HashMap<>();
} else {
this.prop = prop;
}
}

/**
* only use for single consume
*/
public void reconsumeLater() {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.RECONSUME_LATER.name());
}

Expand All @@ -186,9 +187,6 @@ public void reconsumeLater() {
*/
@Deprecated
public void reconsumeLater(int delayLevel) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.RECONSUME_LATER.name());
prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel));
}
Expand All @@ -197,9 +195,6 @@ public void reconsumeLater(int delayLevel) {
* only use for single consume
*/
public void reconsumeLater(SaturnDelayedLevel delayLevel) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.RECONSUME_LATER.name());
prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel.getValue()));
}
Expand All @@ -208,59 +203,42 @@ public void reconsumeLater(SaturnDelayedLevel delayLevel) {
* only use for single consume
*/
public void complete() {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.CONSUME_SUCCESS.name());
}

/**
* only use for single consume
*/
public void discard() {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(SaturnJobReturn.MSG_CONSUME_STATUS_PROP_KEY, SaturnConsumeStatus.CONSUME_DISCARD.name());
}

/**
* only use for batch consume
*/
public void completeAll() {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(MSG_BATCH_CONSUME_SUCCESS_OFFSETS, MSG_ALL);
}

/**
* only use for batch consume
*/
public boolean isCompleteAll() {
if (prop == null) {
return false;
}
return MSG_ALL.equals(prop.get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS));
}

/**
* only use for batch consume
*/
public void completeSome(List<MsgHolder> msgHolders) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(MSG_BATCH_CONSUME_SUCCESS_OFFSETS, collectOffsetsToString(msgHolders));
String finalOffsets = mergeOffsets(prop.get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS), msgHolders);
prop.put(MSG_BATCH_CONSUME_SUCCESS_OFFSETS, finalOffsets);
}

/**
* only use for batch consume
*/
public List<String> getCompleteOffsets() {
if (prop == null) {
return Collections.emptyList();
}
String offsetsStr = prop.get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS);
return parseOffsetsStr(offsetsStr);
}
Expand All @@ -276,22 +254,18 @@ public void reconsumeSome(List<MsgHolder> msgHolders) {
* only use for batch consume
*/
public void reconsumeSome(List<MsgHolder> msgHolders, SaturnDelayedLevel delayLevel) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(MSG_BATCH_CONSUME_DELAY_OFFSETS, collectOffsetsToString(msgHolders));
String finalOffsets = mergeOffsets(prop.get(MSG_BATCH_CONSUME_DELAY_OFFSETS), msgHolders);
prop.put(MSG_BATCH_CONSUME_DELAY_OFFSETS, finalOffsets);
if (delayLevel != null) {
prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel.getValue()));
}

}

/**
* only use for batch consume
*/
public List<String> getReconsumeOffsets() {
if (prop == null) {
return Collections.emptyList();
}
String offsetsStr = prop.get(MSG_BATCH_CONSUME_DELAY_OFFSETS);
return parseOffsetsStr(offsetsStr);
}
Expand All @@ -307,9 +281,6 @@ public void reconsumeAllLater() {
* only use for batch consume
*/
public void reconsumeAllLater(SaturnDelayedLevel delayLevel) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(MSG_BATCH_CONSUME_DELAY_OFFSETS, MSG_ALL);
if (delayLevel != null) {
prop.put(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY, String.valueOf(delayLevel.getValue()));
Expand All @@ -320,36 +291,25 @@ public void reconsumeAllLater(SaturnDelayedLevel delayLevel) {
* only use for batch consume
*/
public boolean isReconsumeAll() {
if (prop == null) {
return false;
}
return MSG_ALL.equals(prop.get(MSG_BATCH_CONSUME_DELAY_OFFSETS));
}

public String getDelayLevel() {
if (prop == null) {
return null;
}
return prop.get(SaturnJobReturn.DELAY_LEVEL_WHEN_RECONSUME_PROP_KEY);
}

/**
* only use for batch consume
*/
public void discardSome(List<MsgHolder> msgHolders) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(MSG_BATCH_CONSUME_DISCARD_OFFSETS, collectOffsetsToString(msgHolders));
String finalOffsets = mergeOffsets(prop.get(MSG_BATCH_CONSUME_DISCARD_OFFSETS), msgHolders);
prop.put(MSG_BATCH_CONSUME_DISCARD_OFFSETS, finalOffsets);
}

/**
* only use for batch consume
*/
public List<String> getDiscardOffsets() {
if (prop == null) {
return Collections.emptyList();
}
String offsetsStr = prop.get(MSG_BATCH_CONSUME_DISCARD_OFFSETS);
return parseOffsetsStr(offsetsStr);
}
Expand All @@ -358,38 +318,55 @@ public List<String> getDiscardOffsets() {
* only use for batch consume
*/
public void setBatchConsumeDefaultStatus(SaturnConsumeStatus consumeStatus) {
if (prop == null) {
prop = new ConcurrentHashMap<>();
}
prop.put(MSG_BATCH_CONSUME_DEFAULT_STATUS, consumeStatus.name());
}

/**
* only use for batch consume
*/
public String getBatchConsumeDefaultStatus() {
if (prop == null) {
return null;
}
return prop.get(MSG_BATCH_CONSUME_DEFAULT_STATUS);
}

private String mergeOffsets(String existedOffsets, List<MsgHolder> newMsgHolders) {
if (MSG_ALL.equals(existedOffsets)) {
return MSG_ALL;
}

List<Long> newOffsetList = new ArrayList<>();
for (MsgHolder msgHolder : newMsgHolders) {
newOffsetList.add(msgHolder.getOffset());
}

if (existedOffsets == null || existedOffsets.isEmpty()) {
return collectOffsetsToString(newOffsetList);
}

// 去重
List<String> existedOffsetList = parseOffsetsStr(existedOffsets);
List<Long> needToAddOffsets = new ArrayList<>();
for (Long newOffset : newOffsetList) {
if (!existedOffsetList.contains(String.valueOf(newOffset))) {
needToAddOffsets.add(newOffset);
}
}

return existedOffsets + OFFSET_SEPERATOR + collectOffsetsToString(needToAddOffsets);
}

@Override
public String toString() {
return "SaturnJobReturn [returnCode=" + returnCode + ", returnMsg=" + returnMsg + ", errorGroup=" + errorGroup
+ ", prop=" + prop + "]";
}

private String collectOffsetsToString(List<MsgHolder> msgHolders) {
if (msgHolders == null && msgHolders.size() == 0) {
return "";
}
private String collectOffsetsToString(List<Long> offsets) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < msgHolders.size(); i++) {
for (int i = 0; i < offsets.size(); i++) {
if (i > 0) {
sb.append(OFFSET_SEPERATOR);
}
sb.append(String.valueOf(msgHolders.get(i).getOffset()));
sb.append(String.valueOf(offsets.get(i)));
}
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package com.vip.saturn.job;

import com.vip.saturn.job.msg.MsgHolder;
import com.vip.saturn.job.msg.SaturnDelayedLevel;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.List;

import static com.vip.saturn.job.SaturnJobReturn.*;

public class SaturnJobReturnTest {

@Test
public void testComplete() {
SaturnJobReturn saturnJobReturn = SaturnJobReturn.builder().complete().build();
Assert.assertEquals(SaturnConsumeStatus.CONSUME_SUCCESS.name(),
saturnJobReturn.getProp().get(MSG_CONSUME_STATUS_PROP_KEY));
}

@Test
public void testDiscard() {
SaturnJobReturn saturnJobReturn = SaturnJobReturn.builder().discard().build();
Assert.assertEquals(SaturnConsumeStatus.CONSUME_DISCARD.name(),
saturnJobReturn.getProp().get(MSG_CONSUME_STATUS_PROP_KEY));
}

@Test
public void testReconsume() {
SaturnJobReturn saturnJobReturn = SaturnJobReturn.builder().returnCode(SaturnSystemReturnCode.USER_FAIL)
.errorGroup(SaturnSystemErrorGroup.FAIL).reconsumeLater(SaturnDelayedLevel.DELAYED_LEVEL_1H).build();
Assert.assertEquals(SaturnConsumeStatus.RECONSUME_LATER.name(),
saturnJobReturn.getProp().get(MSG_CONSUME_STATUS_PROP_KEY));
Assert.assertEquals(SaturnDelayedLevel.DELAYED_LEVEL_1H.getValue(),
Integer.parseInt(saturnJobReturn.getDelayLevel()));
Assert.assertEquals(SaturnSystemReturnCode.USER_FAIL, saturnJobReturn.getReturnCode());
Assert.assertEquals(SaturnSystemErrorGroup.FAIL, saturnJobReturn.getErrorGroup());
}

@Test
public void testCompleteAll() {
SaturnJobReturn saturnJobReturn = SaturnJobReturn.builder().completeAll().build();
Assert.assertTrue(saturnJobReturn.isCompleteAll());
Assert.assertEquals(SaturnSystemReturnCode.SUCCESS, saturnJobReturn.getReturnCode());
Assert.assertEquals(SaturnSystemErrorGroup.SUCCESS, saturnJobReturn.getErrorGroup());
}

@Test
public void testReconsumeAll() {
SaturnJobReturn saturnJobReturn = SaturnJobReturn.builder().reconsumeAll(SaturnDelayedLevel.DELAYED_LEVEL_10S)
.build();
Assert.assertTrue(saturnJobReturn.isReconsumeAll());
Assert.assertEquals(SaturnDelayedLevel.DELAYED_LEVEL_10S.getValue(),
Integer.parseInt(saturnJobReturn.getDelayLevel()));
Assert.assertEquals(SaturnSystemReturnCode.SUCCESS, saturnJobReturn.getReturnCode());
Assert.assertEquals(SaturnSystemErrorGroup.SUCCESS, saturnJobReturn.getErrorGroup());
}

@Test
public void testCompleteSome() {
List<MsgHolder> completeMsgHolders = new ArrayList<>();
for (long i = 1; i <= 5; i++) {
completeMsgHolders.add(new MsgHolder(null, null, "", i));
}
List<MsgHolder> reconsumeMsgHolders = new ArrayList<>();
for (long i = 11; i <= 15; i++) {
reconsumeMsgHolders.add(new MsgHolder(null, null, "", i));
}
List<MsgHolder> discardMsgHolders = new ArrayList<>();
for (long i = 21; i <= 25; i++) {
discardMsgHolders.add(new MsgHolder(null, null, "", i));
}
SaturnJobReturn saturnJobReturn = SaturnJobReturn.builder()
.batchConsumeDefaultStatus(SaturnConsumeStatus.CONSUME_SUCCESS).completeSome(completeMsgHolders)
.reconsumeSome(reconsumeMsgHolders).discardSome(discardMsgHolders).build();

Assert.assertEquals(SaturnSystemReturnCode.SUCCESS, saturnJobReturn.getReturnCode());
Assert.assertEquals(SaturnSystemErrorGroup.SUCCESS, saturnJobReturn.getErrorGroup());
Assert.assertEquals(SaturnConsumeStatus.CONSUME_SUCCESS.name(), saturnJobReturn.getBatchConsumeDefaultStatus());
Assert.assertEquals(5, saturnJobReturn.getCompleteOffsets().size());
Assert.assertEquals("1,2,3,4,5", saturnJobReturn.getProp().get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS));
Assert.assertEquals(5, saturnJobReturn.getReconsumeOffsets().size());
Assert.assertEquals("11,12,13,14,15", saturnJobReturn.getProp().get(MSG_BATCH_CONSUME_DELAY_OFFSETS));
Assert.assertEquals(5, saturnJobReturn.getDiscardOffsets().size());
Assert.assertEquals("21,22,23,24,25", saturnJobReturn.getProp().get(MSG_BATCH_CONSUME_DISCARD_OFFSETS));

// 测试增量
completeMsgHolders.clear();
for (long i = 6; i <= 9; i++) {
completeMsgHolders.add(new MsgHolder(null, null, "", i));
}
reconsumeMsgHolders.clear();
for (long i = 16; i <= 19; i++) {
reconsumeMsgHolders.add(new MsgHolder(null, null, "", i));
}
discardMsgHolders.clear();
for (long i = 26; i <= 29; i++) {
discardMsgHolders.add(new MsgHolder(null, null, "", i));
}
saturnJobReturn.completeSome(completeMsgHolders);
saturnJobReturn.reconsumeSome(reconsumeMsgHolders);
saturnJobReturn.discardSome(discardMsgHolders);

Assert.assertEquals(9, saturnJobReturn.getCompleteOffsets().size());
Assert.assertEquals("1,2,3,4,5,6,7,8,9", saturnJobReturn.getProp().get(MSG_BATCH_CONSUME_SUCCESS_OFFSETS));
Assert.assertEquals(9, saturnJobReturn.getReconsumeOffsets().size());
Assert.assertEquals("11,12,13,14,15,16,17,18,19",
saturnJobReturn.getProp().get(MSG_BATCH_CONSUME_DELAY_OFFSETS));
Assert.assertEquals(9, saturnJobReturn.getDiscardOffsets().size());
Assert.assertEquals("21,22,23,24,25,26,27,28,29",
saturnJobReturn.getProp().get(MSG_BATCH_CONSUME_DISCARD_OFFSETS));

}
}

0 comments on commit 2440ac0

Please sign in to comment.