Skip to content

Commit

Permalink
fix #2862 Optimize SearchLogHelper for direct queue handling and stre…
Browse files Browse the repository at this point in the history
…amlined batch processing.
  • Loading branch information
marevol committed Dec 27, 2024
1 parent 80e9cb0 commit cfa492e
Showing 1 changed file with 82 additions and 69 deletions.
151 changes: 82 additions & 69 deletions src/main/java/org/codelibs/fess/helper/SearchLogHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ public class SearchLogHelper {

protected int userInfoCacheSize = 10000;

protected volatile Queue<SearchLog> searchLogQueue = new ConcurrentLinkedQueue<>();
protected Queue<SearchLog> searchLogQueue = new ConcurrentLinkedQueue<>();

protected volatile Queue<ClickLog> clickLogQueue = new ConcurrentLinkedQueue<>();
protected Queue<ClickLog> clickLogQueue = new ConcurrentLinkedQueue<>();

protected LoadingCache<String, UserInfo> userInfoCache;

Expand Down Expand Up @@ -203,16 +203,19 @@ public void addClickLog(final ClickLog clickLog) {
}

public void storeSearchLog() {
if (!searchLogQueue.isEmpty()) {
final Queue<SearchLog> queue = searchLogQueue;
searchLogQueue = new ConcurrentLinkedQueue<>();
processSearchLogQueue(queue);
}
storeSearchLogFromQueue();
storeClickLogFromQueue();
}

protected void storeClickLogFromQueue() {
if (!clickLogQueue.isEmpty()) {
final Queue<ClickLog> queue = clickLogQueue;
clickLogQueue = new ConcurrentLinkedQueue<>();
processClickLogQueue(queue);
processClickLogQueue(clickLogQueue);
}
}

protected void storeSearchLogFromQueue() {
if (!searchLogQueue.isEmpty()) {
processSearchLogQueue(searchLogQueue);
}
}

Expand Down Expand Up @@ -271,27 +274,40 @@ protected void processSearchLogQueue(final Queue<SearchLog> queue) {
botNames = value.split(",");
}

final int batchSize = ComponentUtil.getFessConfig().getSearchlogProcessBatchSizeAsInteger();

final List<SearchLog> searchLogList = new ArrayList<>();
final Map<String, UserInfo> userInfoMap = new HashMap<>();
queue.stream().forEach(searchLog -> {
final String userAgent = searchLog.getUserAgent();
final boolean isBot =
userAgent != null && stream(botNames).get(stream -> stream.anyMatch(botName -> userAgent.indexOf(botName) >= 0));
if (!isBot) {
searchLog.getUserInfo().ifPresent(userInfo -> {
final String code = userInfo.getId();
final UserInfo oldUserInfo = userInfoMap.get(code);
if (oldUserInfo != null) {
userInfo.setCreatedAt(oldUserInfo.getCreatedAt());
}
userInfoMap.put(code, userInfo);
});
searchLogList.add(searchLog);
while (!queue.isEmpty()) {
final SearchLog searchLog = queue.poll();
if (searchLog != null) {
final String userAgent = searchLog.getUserAgent();
final boolean isBot =
userAgent != null && stream(botNames).get(stream -> stream.anyMatch(botName -> userAgent.indexOf(botName) >= 0));
if (!isBot) {
searchLog.getUserInfo().ifPresent(userInfo -> {
final String code = userInfo.getId();
final UserInfo oldUserInfo = userInfoMap.get(code);
if (oldUserInfo != null) {
userInfo.setCreatedAt(oldUserInfo.getCreatedAt());
}
userInfoMap.put(code, userInfo);
});
searchLogList.add(searchLog);
}
}
});
if (searchLogList.size() >= batchSize) {
processUserInfoLog(searchLogList, userInfoMap);
processSearchLog(searchLogList);
searchLogList.clear();
userInfoMap.clear();
}
}

processUserInfoLog(searchLogList, userInfoMap);
processSearchLog(searchLogList);
if (!searchLogList.isEmpty()) {
processUserInfoLog(searchLogList, userInfoMap);
processSearchLog(searchLogList);
}
}

private void processSearchLog(final List<SearchLog> searchLogList) {
Expand Down Expand Up @@ -341,47 +357,52 @@ protected void processUserInfoLog(final List<SearchLog> searchLogList, final Map

protected void storeSearchLogList(final List<SearchLog> searchLogList) {
final SearchLogBhv searchLogBhv = ComponentUtil.getComponent(SearchLogBhv.class);
final int batchSize = ComponentUtil.getFessConfig().getSearchlogProcessBatchSizeAsInteger();
final int totalSize = searchLogList.size();
for (int i = 0; i < totalSize; i += batchSize) {
final int end = Math.min(totalSize, i + batchSize);
if (logger.isDebugEnabled()) {
logger.debug("Sending {} search logs. ({}-{}/{})", end - i, i, end, totalSize);
}
searchLogBhv.batchUpdate(searchLogList.subList(i, end), op -> {
op.setRefreshPolicy(Constants.TRUE);
});
}
searchLogBhv.batchUpdate(searchLogList, op -> {
op.setRefreshPolicy(Constants.TRUE);
});
}

protected void processClickLogQueue(final Queue<ClickLog> queue) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
final int batchSize = fessConfig.getSearchlogProcessBatchSizeAsInteger();
final Map<String, Integer> clickCountMap = new HashMap<>();
final List<ClickLog> clickLogList = new ArrayList<>();
for (final ClickLog clickLog : queue) {
try {
final SearchLogBhv searchLogBhv = ComponentUtil.getComponent(SearchLogBhv.class);
searchLogBhv.selectEntity(cb -> {
cb.query().setQueryId_Equal(clickLog.getQueryId());
}).ifPresent(entity -> {
clickLogList.add(clickLog);
final String docId = clickLog.getDocId();
Integer countObj = clickCountMap.get(docId);
if (countObj == null) {
countObj = 1;
} else {
countObj = countObj.intValue() + 1;
}
clickCountMap.put(docId, countObj);
}).orElse(() -> {
logger.warn("Not Found for SearchLog: {}", clickLog);
});
} catch (final Exception e) {
logger.warn("Failed to process: {}", clickLog, e);
while (!queue.isEmpty()) {
final ClickLog clickLog = queue.poll();
if (clickLog != null) {
try {
final SearchLogBhv searchLogBhv = ComponentUtil.getComponent(SearchLogBhv.class);
searchLogBhv.selectEntity(cb -> {
cb.query().setQueryId_Equal(clickLog.getQueryId());
}).ifPresent(entity -> {
clickLogList.add(clickLog);
final String docId = clickLog.getDocId();
Integer countObj = clickCountMap.get(docId);
if (countObj == null) {
countObj = 1;
} else {
countObj = countObj.intValue() + 1;
}
clickCountMap.put(docId, countObj);
}).orElse(() -> {
logger.warn("Not Found for SearchLog: {}", clickLog);
});
} catch (final Exception e) {
logger.warn("Failed to process: {}", clickLog, e);
}
}
if (clickLogList.size() >= batchSize) {
processClickLog(clickLogList);
updateClickFieldInIndex(clickCountMap);
clickLogList.clear();
clickCountMap.clear();
}
}
processClickLog(clickLogList);

updateClickFieldInIndex(clickCountMap);
if (!clickLogList.isEmpty()) {
processClickLog(clickLogList);
updateClickFieldInIndex(clickCountMap);
}
}

protected void updateClickFieldInIndex(final Map<String, Integer> clickCountMap) {
Expand Down Expand Up @@ -424,15 +445,7 @@ protected void processClickLog(final List<ClickLog> clickLogList) {
final FessConfig fessConfig = ComponentUtil.getFessConfig();
try {
final ClickLogBhv clickLogBhv = ComponentUtil.getComponent(ClickLogBhv.class);
final int batchSize = fessConfig.getSearchlogProcessBatchSizeAsInteger();
final int totalSize = clickLogList.size();
for (int i = 0; i < totalSize; i += batchSize) {
final int end = Math.min(totalSize, i + batchSize);
if (logger.isDebugEnabled()) {
logger.debug("Sending {} click logs. ({}-{}/{})", end - i, i, end, totalSize);
}
clickLogBhv.batchInsert(clickLogList.subList(i, end));
}
clickLogBhv.batchInsert(clickLogList);
} catch (final Exception e) {
logger.warn("Failed to insert: {}", clickLogList, e);
}
Expand Down

0 comments on commit cfa492e

Please sign in to comment.