diff --git a/replication-plugin.xml b/replication-plugin.xml index f998ec6..6c34fa2 100644 --- a/replication-plugin.xml +++ b/replication-plugin.xml @@ -18,7 +18,7 @@ requestsQueueSize and requestsSendingThreadsCount attributes define thread pool requests from master to peers asynchronously. The default values are: 500 for requestsQueueSize and 1 for requestsSendingThreadsCount --> - + http://localhost:8083/nexus diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/model/config/NexusServer.java b/src/main/java/com/griddynamics/cd/nrp/internal/model/config/NexusServer.java index a479160..4d78ae0 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/model/config/NexusServer.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/model/config/NexusServer.java @@ -28,16 +28,25 @@ @RequiredArgsConstructor @XmlAccessorType(XmlAccessType.FIELD) public class NexusServer { - @Getter @NonNull @XmlElement(name = "url") private String url; - @Getter @NonNull @XmlElement(name = "user") private String user; - @Getter @NonNull @XmlElement(name = "password") private String password; + + public String getUrl() { + return url; + } + + public String getUser() { + return user; + } + + public String getPassword() { + return password; + } } diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java b/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java index 1855e61..47a4feb 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/model/config/ReplicationPluginConfiguration.java @@ -34,7 +34,6 @@ @RequiredArgsConstructor @XmlRootElement(name = "configurations") public class ReplicationPluginConfiguration { - @Getter @XmlElement(name = "server") @XmlElementWrapper(name = "servers") private final Set servers = new HashSet<>(); @@ -42,14 +41,30 @@ public class ReplicationPluginConfiguration { @NonNull @XmlAttribute(name = "myUrl") private String myUrl; - @Getter @XmlAttribute(name = "requestsQueueSize") private Integer requestsQueueSize = 500; - @Getter @XmlAttribute(name = "requestsSendingThreadsCount") private Integer requestsSendingThreadsCount = 1; + @XmlAttribute(name = "queueDumpFileName") + private String queueDumpFileName; public void addServer(NexusServer server) { servers.add(server); } + + public Integer getRequestsQueueSize() { + return requestsQueueSize; + } + + public String getQueueDumpFileName() { + return queueDumpFileName; + } + + public Integer getRequestsSendingThreadsCount() { + return requestsSendingThreadsCount; + } + + public Set getServers() { + return servers; + } } diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/model/internal/ArtifactMetaInfoQueueDump.java b/src/main/java/com/griddynamics/cd/nrp/internal/model/internal/ArtifactMetaInfoQueueDump.java new file mode 100644 index 0000000..e635f42 --- /dev/null +++ b/src/main/java/com/griddynamics/cd/nrp/internal/model/internal/ArtifactMetaInfoQueueDump.java @@ -0,0 +1,46 @@ +/* + * Copyright 2015, Grid Dynamics International, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.griddynamics.cd.nrp.internal.model.internal; + +import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo; + +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlElementWrapper; +import javax.xml.bind.annotation.XmlRootElement; +import java.util.HashSet; +import java.util.Set; + +/** + * DTO Class encapsulates artifact replication queue + */ +@XmlRootElement(name = "artifactMetaInfoBlockingQueueDump") +public class ArtifactMetaInfoQueueDump { + @XmlElement(name = "artifactMetaInfo") + @XmlElementWrapper(name = "artifactMetaInfos") + private final Set artifactMetaInfos = new HashSet<>(); + + public void addArtifactMetaInfo(ArtifactMetaInfo artifactMetaInfo) { + artifactMetaInfos.add(artifactMetaInfo); + } + public void addAllArtifactMetaInfo(Set artifactMetaInfo) { + artifactMetaInfos.addAll(artifactMetaInfo); + } + + public Set getArtifactMetaInfos() { + return artifactMetaInfos; + } + +} diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java index 3e6aeb2..5f6087a 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/ArtifactUpdateApiClient.java @@ -18,5 +18,6 @@ import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo; public interface ArtifactUpdateApiClient { - void sendRequest(ArtifactMetaInfo metaInfo); + void offerRequest(ArtifactMetaInfo artifactMetaInfo); + } diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java index 9792f29..52cf9bf 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/ArtifactUpdateApiClientImpl.java @@ -16,6 +16,7 @@ package com.griddynamics.cd.nrp.internal.uploading.impl; import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo; +import com.griddynamics.cd.nrp.internal.model.internal.ArtifactMetaInfoQueueDump; import com.griddynamics.cd.nrp.internal.model.api.RestResponse; import com.griddynamics.cd.nrp.internal.model.config.NexusServer; import com.griddynamics.cd.nrp.internal.uploading.ArtifactUpdateApiClient; @@ -34,6 +35,9 @@ import javax.inject.Singleton; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.UriBuilder; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Unmarshaller; +import java.io.File; import java.util.concurrent.*; @Singleton @@ -50,23 +54,83 @@ public class ArtifactUpdateApiClientImpl extends ComponentSupport implements Art /** * ExecutorService shares between clients. All treads are created in the same executor */ - private final ExecutorService asyncRequestsExecutorService; + private final ExecutorService jerseyHttpClientExecutor; + private final FileBlockingQueue fileBlockingQueue; @Inject public ArtifactUpdateApiClientImpl(ConfigurationsManager configurationsManager) { this.configurationsManager = configurationsManager; - BlockingQueue queue = new LinkedBlockingQueue<>(configurationsManager.getConfiguration().getRequestsQueueSize()); - this.asyncRequestsExecutorService = new ThreadPoolExecutor( + this.fileBlockingQueue = initFileBlockingQueue(configurationsManager); + this.jerseyHttpClientExecutor = new ThreadPoolExecutor( configurationsManager.getConfiguration().getRequestsSendingThreadsCount(), configurationsManager.getConfiguration().getRequestsSendingThreadsCount(), - 30, TimeUnit.SECONDS, queue); + 30, + TimeUnit.SECONDS, + new LinkedBlockingQueue( + configurationsManager.getConfiguration().getRequestsQueueSize()) + ); + initBackgroundWorkers(configurationsManager); + } + + private void initBackgroundWorkers(ConfigurationsManager configurationsManager) { + int requestsSendingThreadsCount = configurationsManager.getConfiguration() + .getRequestsSendingThreadsCount(); + ExecutorService executorService = Executors.newFixedThreadPool(requestsSendingThreadsCount); + for (int i = 0; i < requestsSendingThreadsCount; i++) { + executorService.submit(new Runnable() { + @Override + public void run() { + while (!Thread.currentThread().isInterrupted()) { + try { + final ArtifactMetaInfo artifactMetaInfo = fileBlockingQueue.peek(); + sendRequest(artifactMetaInfo); + fileBlockingQueue.take(); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + } + } + }); + } + } + + private FileBlockingQueue initFileBlockingQueue(ConfigurationsManager configurationsManager) { + BlockingQueue blockingQueue = + new LinkedBlockingQueue<>(configurationsManager. + getConfiguration().getRequestsQueueSize()); + String blockingQueueDumpFileName = configurationsManager.getConfiguration().getQueueDumpFileName(); + FileBlockingQueue retVal = new FileBlockingQueue(blockingQueue, + blockingQueueDumpFileName); + try { + File blockingQueueInputFile = new File(blockingQueueDumpFileName); + if (blockingQueueInputFile.exists()) { + JAXBContext jaxbContext = JAXBContext.newInstance(ArtifactMetaInfoQueueDump.class); + Unmarshaller unmarshaller = jaxbContext.createUnmarshaller(); + ArtifactMetaInfoQueueDump unmarshal = (ArtifactMetaInfoQueueDump) unmarshaller.unmarshal(blockingQueueInputFile); + for (ArtifactMetaInfo artifactMetaInfo : unmarshal.getArtifactMetaInfos()) { + offerRequest(artifactMetaInfo); + } + } + } catch (Exception e) { + log.error(e.getMessage(), e); + } + return retVal; + } + + @Override + public void offerRequest(ArtifactMetaInfo artifactMetaInfo) { + try { + fileBlockingQueue.offer(artifactMetaInfo, 30, TimeUnit.SECONDS); + } catch (Exception e) { + log.error(e.getMessage(), e); + } } /** * Sends replication requests to all nexus servers configured in XML file + * * @param metaInfo Artifact information */ - @Override public void sendRequest(ArtifactMetaInfo metaInfo) { for (NexusServer server : configurationsManager.getConfiguration().getServers()) { AsyncWebResource.Builder service = getService(server.getUrl(), server.getUser(), server.getPassword()); @@ -104,13 +168,15 @@ public GenericType getGenericType() { /** * Returns jersey HTTP resource to access to the remote replication servers + * * @param nexusUrl URL of the remote server - * @param login Username on the remote server + * @param login Username on the remote server * @param password User's password * @return Jersey HTTP client */ private AsyncWebResource.Builder getService(String nexusUrl, String login, String password) { Client client = getClient(login, password); + client.setExecutorService(jerseyHttpClientExecutor); AsyncWebResource webResource = client.asyncResource(UriBuilder.fromUri(nexusUrl).build()); webResource = webResource.path("service").path("local").path("artifact").path("maven").path("update"); return webResource.accept(MediaType.APPLICATION_XML_TYPE) @@ -119,14 +185,15 @@ private AsyncWebResource.Builder getService(String nexusUrl, String login, Strin /** * Creates jersey HTTP client - * @param login Username on the remote server + * + * @param login Username on the remote server * @param password User's password * @return HTTP client */ private Client getClient(String login, String password) { ClientConfig config = new DefaultClientConfig(); Client client = Client.create(config); - client.setExecutorService(asyncRequestsExecutorService); + client.setExecutorService(jerseyHttpClientExecutor); if (login != null && !login.isEmpty() && password != null) { log.debug("Creating HTTP client with authorized HTTPBasicAuthFilter."); client.addFilter(new HTTPBasicAuthFilter(login, password)); diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/FileBlockingQueue.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/FileBlockingQueue.java new file mode 100644 index 0000000..df27fb9 --- /dev/null +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/FileBlockingQueue.java @@ -0,0 +1,79 @@ +package com.griddynamics.cd.nrp.internal.uploading.impl; + +import com.google.common.collect.Sets; +import com.griddynamics.cd.nrp.internal.model.api.ArtifactMetaInfo; +import com.griddynamics.cd.nrp.internal.model.internal.ArtifactMetaInfoQueueDump; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.xml.bind.JAXBContext; +import javax.xml.bind.Marshaller; +import java.io.File; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +public class FileBlockingQueue { + + private final BlockingQueue internalBlockingQueue; + private final String blockingQueueDumpFileName; + + private Logger log = LoggerFactory.getLogger(FileBlockingQueue.class); + + public FileBlockingQueue(BlockingQueue blockingQueue, String blockingQueueDumpFileName) { + this.internalBlockingQueue = blockingQueue; + this.blockingQueueDumpFileName = blockingQueueDumpFileName; + } + + public boolean offer(ArtifactMetaInfo e, long timeout, TimeUnit timeUnit) throws InterruptedException { + synchronized (internalBlockingQueue) { + boolean retVal = internalBlockingQueue.offer(e, timeout, timeUnit); + saveQueueToFile(); + internalBlockingQueue.notify(); + return retVal; + } + } + + public ArtifactMetaInfo peek() throws InterruptedException { + synchronized (internalBlockingQueue) { + while (internalBlockingQueue.isEmpty()) { + internalBlockingQueue.wait(); + } + return internalBlockingQueue.peek(); + } + } + + public ArtifactMetaInfo take() throws InterruptedException { + synchronized (internalBlockingQueue) { + while (internalBlockingQueue.isEmpty()) { + internalBlockingQueue.wait(); + } + ArtifactMetaInfo retVal = internalBlockingQueue.take(); + saveQueueToFile(); + return retVal; + + } + } + + private synchronized void saveQueueToFile() { + + try { + String backupBlockingQueueDumpFileName = blockingQueueDumpFileName + ".bak"; + File blockingQueueDumpFile = new File(blockingQueueDumpFileName); + if (blockingQueueDumpFile.exists() && !blockingQueueDumpFile.isDirectory()) { + FileUtils.copyFile(blockingQueueDumpFile, new File(backupBlockingQueueDumpFileName)); + } + JAXBContext jaxbContext = JAXBContext.newInstance(ArtifactMetaInfoQueueDump.class); + Marshaller marshaller = jaxbContext.createMarshaller(); + ArtifactMetaInfoQueueDump artifactMetaInfoBlockingQueueDump = + new ArtifactMetaInfoQueueDump(); + artifactMetaInfoBlockingQueueDump.addAllArtifactMetaInfo(Sets.newHashSet(internalBlockingQueue)); + marshaller.marshal(artifactMetaInfoBlockingQueueDump, blockingQueueDumpFile); + } catch (Exception e) { + log.error(e.getMessage(), e); + } + + + } + +} diff --git a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java index 076532f..8bad9be 100644 --- a/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java +++ b/src/main/java/com/griddynamics/cd/nrp/internal/uploading/impl/UploadEventListenerImpl.java @@ -87,7 +87,7 @@ public void onArtifactUploading(RepositoryItemEventStore event) { updateArtifactStatus(metaInfo, artifactStatus); if (artifactStatus.isReadyForReplication()) { log.debug("File with hashes received for: " + metaInfo.toString() + " Sending request"); - artifactUpdateApiClient.sendRequest(metaInfo); + artifactUpdateApiClient.offerRequest(metaInfo); clearStatus(metaInfo); } }