Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/ratelimiter #948

Merged
merged 7 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions MServer-Config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,22 @@ maximumRequestsPerSecond: 999.0

# If set only these Sender will be crawled all other will be ignored.
senderIncluded:
#- MDR
#- NDR
#- ARD
#- ARTE_DE
#- ARGE_FR
#- ARTE_EN
#- ARTE_PL
#- ARTE_IT
#- ARTE_ES
#- 3SAT
#- FUNK
#- KIKA
- DW
#- BR
#- DW
#- ORF
#- PHOENIX
#- SRF
- SR
#- ZDF

# If set the server will be awake after the crawler run and restarts the run after the given amount.
#schedules:
Expand Down Expand Up @@ -111,14 +121,14 @@ checkImportListUrlTimeoutInSec: 1800

#### Default crawler configurations ####
# The maximum amount of URLs to be processed per task.
maximumUrlsPerTask: 50
maximumUrlsPerTask: 10

# The maximum duration in minutes a crawler may run.
maximumCrawlDurationInMinutes: 120

# Enables the topics search
# maximumSubpages limits the depth of the topics search
topicsSearchEnabled: false
topicsSearchEnabled: true

# The maximum amount of sub pages to be crawled.<br>
# Example: If a Sendung overview side has 10 pages with videos for this Sendung and
Expand Down Expand Up @@ -164,14 +174,14 @@ senderConfigurations:
KIKA:
maximumSubpages: 2
maximumRequestsPerSecond: 8.0
SR:
maximumRequestsPerSecond: 2.0
ZDF:
maximumRequestsPerSecond: 10.0
FUNK:
maximumUrlsPerTask: 99
DW:
maximumSubpages: 0
SR:
maximumSubpages: 5

# configure string variables
crawlerApiParams:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package de.mediathekview.mserver.base.webaccess;

import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
Expand All @@ -22,12 +23,13 @@ public class JsoupConnection {
private static final String FILE_TYPE_M3U8 = "m3u8";
protected OkHttpClient client;

public JsoupConnection(final int timeout) {
public JsoupConnection(final int timeout, final int threadPoolSize) {
client =
new OkHttpClient.Builder()
.connectTimeout(timeout, TimeUnit.SECONDS)
.readTimeout(timeout, TimeUnit.SECONDS)
.callTimeout(timeout, TimeUnit.SECONDS)
.connectionPool(new ConnectionPool(threadPoolSize, 5L, TimeUnit.MINUTES))
.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package de.mediathekview.mserver.crawler.arte.tasks;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
Expand All @@ -19,7 +18,6 @@

public abstract class ArteTaskBase<T, D extends CrawlerUrlDTO> extends AbstractRestTask<T, D> {
private static final Logger LOG = LogManager.getLogger(ArteTaskBase.class);
private static RateLimiter limiter = null;
private final transient GsonBuilder gsonBuilder;

protected ArteTaskBase(
Expand Down Expand Up @@ -106,11 +104,6 @@ private Response executeRequest(final WebTarget aTarget) {
if (authKey.isPresent()) {
request = request.header(HEADER_AUTHORIZATION, authKey.get());
}

if (limiter == null) {
limiter = RateLimiter.create(crawler.getCrawlerConfig().getMaximumRequestsPerSecond());
}
limiter.acquire();
return request
.header(HEADER_ACCEPT_ENCODING, ENCODING_GZIP)
.header(HEADER_ACCEPT, APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.apache.logging.log4j.Logger;
import org.jsoup.nodes.Document;

import com.google.common.util.concurrent.RateLimiter;

import java.io.IOException;
import java.time.Duration;
import java.time.LocalDateTime;
Expand All @@ -41,6 +43,7 @@ public abstract class AbstractCrawler implements Callable<Set<Film>> {
protected Set<Film> films;
private LocalDateTime startTime;
protected JsoupConnection jsoupConnection;
protected RateLimiter rateLimiter;

protected AbstractCrawler(
final ForkJoinPool aForkJoinPool,
Expand All @@ -58,8 +61,11 @@ protected AbstractCrawler(

runtimeConfig = rootConfig.getConfig();
crawlerConfig = rootConfig.getSenderConfig(getSender());
jsoupConnection = new JsoupConnection(crawlerConfig.getSocketTimeoutInSeconds());

jsoupConnection = new JsoupConnection(
rootConfig.getSenderConfig(getSender()).getSocketTimeoutInSeconds(),
runtimeConfig.getMaximumCpuThreads());
rateLimiter = RateLimiter.create(rootConfig.getSenderConfig(getSender()).getMaximumRequestsPerSecond());

films = ConcurrentHashMap.newKeySet();
}

Expand Down Expand Up @@ -137,6 +143,14 @@ public JsoupConnection getConnection() {
public void setConnection(JsoupConnection connection) {
jsoupConnection = connection;
}

public RateLimiter getRateLimiter() {
return rateLimiter;
}

public void setRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

/**
* Request an url and receive the body as String
Expand All @@ -145,6 +159,7 @@ public void setConnection(JsoupConnection connection) {
* @throws IOException
*/
public String requestBodyAsString(String url) throws IOException {
getRateLimiter().acquire();
return getConnection().requestBodyAsString(url);
}

Expand All @@ -155,6 +170,7 @@ public String requestBodyAsString(String url) throws IOException {
* @throws IOException
*/
public Document requestBodyAsHtmlDocument(String url) throws IOException {
getRateLimiter().acquire();
return getConnection().requestBodyAsHtmlDocument(url);
}

Expand All @@ -165,6 +181,7 @@ public Document requestBodyAsHtmlDocument(String url) throws IOException {
* @throws IOException
*/
public Document requestBodyAsXmlDocument(String url) throws IOException {
getRateLimiter().acquire();
return getConnection().requestBodyAsXmlDocument(url);
}

Expand All @@ -176,6 +193,7 @@ public Document requestBodyAsXmlDocument(String url) throws IOException {
* @return size of the response in KB or -1 in case we could not determine the size.
*/
public long determineFileSizeInKB(String url) {
getRateLimiter().acquire();
return getConnection().determineFileSize(url) / 1024;
}

Expand All @@ -185,6 +203,7 @@ public long determineFileSizeInKB(String url) {
* @return return true if the request was successfully processed by the server
*/
public boolean requestUrlExists(String url) {
getRateLimiter().acquire();
return getConnection().requestUrlExists(url);
}
/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package de.mediathekview.mserver.crawler.basic;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

Expand All @@ -23,7 +22,6 @@ public abstract class AbstractJsonRestTask<T, R, D extends CrawlerUrlDTO>
protected static final String ENCODING_GZIP = "gzip";
private static final long serialVersionUID = -1090560363478964885L;
protected final transient GsonBuilder gsonBuilder;
private static RateLimiter limiter = null;

protected AbstractJsonRestTask(
final AbstractCrawler crawler,
Expand Down Expand Up @@ -63,10 +61,6 @@ protected void processRestTarget(final D aDTO, final WebTarget aTarget) {
}

protected Response createResponse(final Builder request, final D aDTO) {
if (limiter == null) {
limiter = RateLimiter.create(crawler.getCrawlerConfig().getMaximumRequestsPerSecond());
}
limiter.acquire();
request.header(ACCEPT_CHARSET, StandardCharsets.UTF_8);
return request.header(ACCEPT_ENCODING, ENCODING_GZIP).header("User-Agent", "Mozilla").get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ protected void processElement(final D aDTO) {
* @return the {@link WebTarget} to access the url.
*/
protected WebTarget createWebTarget(final String aUrl) {
crawler.getRateLimiter().acquire();
return client.target(aUrl);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package de.mediathekview.mserver.crawler.dw;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;

import de.mediathekview.mlib.daten.Sender;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractRestTask;
import de.mediathekview.mserver.crawler.basic.CrawlerUrlDTO;
Expand All @@ -23,9 +21,6 @@
@SuppressWarnings("serial")
public abstract class DWTaskBase<T, D extends CrawlerUrlDTO> extends AbstractRestTask<T, D> {
private static final Logger LOG = LogManager.getLogger(DWTaskBase.class);

private static RateLimiter limiter = null;

private final transient GsonBuilder gsonBuilder;

protected DWTaskBase(
Expand Down Expand Up @@ -78,10 +73,6 @@ private Response executeRequest(final WebTarget aTarget) {
request.header(
ZdfConstants.HEADER_AUTHENTIFICATION, AUTHORIZATION_BEARER + authKey.get());
}
if (limiter == null) {
limiter = RateLimiter.create(crawler.getRuntimeConfig().getSenderConfig(Sender.DW).getMaximumRequestsPerSecond());
}
limiter.acquire();
return request.header(HEADER_ACCEPT_ENCODING, ENCODING_GZIP).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,14 @@ protected void postProcessing(KikaApiVideoInfoDto aResponseObj, KikaApiFilmDto a
aFilm.setUrls(getVideoUrls(aResponseObj, aDTO));
aFilm.addAllSubtitleUrls(getSubtitle(aResponseObj, aDTO));
//
taskResults.add(aFilm);
crawler.incrementAndGetActualCount();


if (!taskResults.add(aFilm)) {
LOG.debug("Rejected duplicate {}",aFilm);
crawler.incrementAndGetErrorCount();
} else {
crawler.incrementAndGetActualCount();
}
crawler.updateProgress();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import de.mediathekview.mserver.crawler.ard.json.ArdVideoInfoDto;
import de.mediathekview.mserver.crawler.ard.json.ArdVideoInfoJsonDeserializer;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractDocumentTask;
import de.mediathekview.mserver.crawler.basic.AbstractUrlTask;
import de.mediathekview.mserver.crawler.sr.SrTopicUrlDTO;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -27,7 +28,7 @@
import java.time.format.DateTimeParseException;
import java.util.*;

public class SrFilmDetailTask extends SrRateLimitedDocumentTask<Film, SrTopicUrlDTO> {
public class SrFilmDetailTask extends AbstractDocumentTask<Film, SrTopicUrlDTO> {

private static final org.apache.logging.log4j.Logger LOG =
LogManager.getLogger(SrFilmDetailTask.class);
Expand Down Expand Up @@ -156,8 +157,12 @@ protected void processDocument(final SrTopicUrlDTO aUrlDTO, final Document aDocu

addUrls(film, videoInfo.getVideoUrls());

taskResults.add(film);
crawler.incrementAndGetActualCount();
if (taskResults.add(film)) {
crawler.incrementAndGetActualCount();
} else {
crawler.incrementAndGetErrorCount();
LOG.error("Rejected duplicate {}", film);
}
crawler.updateProgress();
} else {
LOG.error("SrFilmDetailTask: no title or video found for url {}", aUrlDTO.getUrl());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import de.mediathekview.mserver.base.HtmlConsts;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractDocumentTask;
import de.mediathekview.mserver.crawler.basic.AbstractUrlTask;
import de.mediathekview.mserver.crawler.sr.SrConstants;
import de.mediathekview.mserver.crawler.sr.SrTopicUrlDTO;
Expand All @@ -15,7 +16,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;

public class SrTopicArchivePageTask
extends SrRateLimitedDocumentTask<SrTopicUrlDTO, SrTopicUrlDTO> {
extends AbstractDocumentTask<SrTopicUrlDTO, SrTopicUrlDTO> {

private static final String NEXT_PAGE_SELECTOR = "div.pagination__item > a[title*=weiter]";
private static final String SHOW_SELECTOR = "h3.teaser__text__header";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package de.mediathekview.mserver.crawler.zdf.tasks;

import com.google.common.util.concurrent.RateLimiter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import de.mediathekview.mlib.daten.Sender;
import de.mediathekview.mserver.crawler.basic.AbstractCrawler;
import de.mediathekview.mserver.crawler.basic.AbstractRestTask;
import de.mediathekview.mserver.crawler.basic.CrawlerUrlDTO;
Expand All @@ -20,9 +18,6 @@

public abstract class ZdfTaskBase<T, D extends CrawlerUrlDTO> extends AbstractRestTask<T, D> {
private static final Logger LOG = LogManager.getLogger(ZdfTaskBase.class);

private static RateLimiter limiter = null;

private final GsonBuilder gsonBuilder;

protected ZdfTaskBase(
Expand Down Expand Up @@ -73,11 +68,6 @@ private Response executeRequest(final WebTarget aTarget) {
request.header(
ZdfConstants.HEADER_AUTHENTIFICATION, AUTHORIZATION_BEARER + authKey.get());
}
if (limiter == null) {
limiter = RateLimiter.create(crawler.getRuntimeConfig().getSenderConfig(Sender.ZDF).getMaximumRequestsPerSecond());
}

limiter.acquire();
return request.header(HEADER_ACCEPT_ENCODING, ENCODING_GZIP).get();
}
}
Loading
Loading