diff --git a/pom.xml b/pom.xml index d594152d3..f5963b732 100644 --- a/pom.xml +++ b/pom.xml @@ -445,6 +445,11 @@ semver4j 3.1.0 + + com.github.peergos + nabu + v0.8.0 + com.github.oshi oshi-core diff --git a/src/main/java/com/ghostchu/peerbanhelper/PeerBanHelperServer.java b/src/main/java/com/ghostchu/peerbanhelper/PeerBanHelperServer.java index c47fb69c1..4b28019ed 100644 --- a/src/main/java/com/ghostchu/peerbanhelper/PeerBanHelperServer.java +++ b/src/main/java/com/ghostchu/peerbanhelper/PeerBanHelperServer.java @@ -681,6 +681,7 @@ private void registerModules() { moduleManager.register(PBHTorrentController.class); moduleManager.register(PBHPeerController.class); moduleManager.register(PBHAlertController.class); + moduleManager.register(PBHFriendController.class); moduleManager.register(PBHLogsController.class); } diff --git a/src/main/java/com/ghostchu/peerbanhelper/config/MainConfigUpdateScript.java b/src/main/java/com/ghostchu/peerbanhelper/config/MainConfigUpdateScript.java index 1e63f29a6..42efc5af7 100644 --- a/src/main/java/com/ghostchu/peerbanhelper/config/MainConfigUpdateScript.java +++ b/src/main/java/com/ghostchu/peerbanhelper/config/MainConfigUpdateScript.java @@ -31,10 +31,10 @@ private void validate() { @UpdateScript(version = 22) public void miscChanges() { + conf.set("ipfs.port", 9899); conf.set("privacy", null); } - @UpdateScript(version = 21) public void addPushProvider(YamlConfiguration bundle) { conf.set("push-notification", bundle.get("push-notification")); diff --git a/src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java b/src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java index b43c69187..177dc4312 100644 --- a/src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java +++ b/src/main/java/com/ghostchu/peerbanhelper/database/DatabaseHelper.java @@ -41,6 +41,8 @@ private void createTables() throws SQLException { TableUtils.createTableIfNotExists(database.getDataSource(), ProgressCheatBlockerPersistEntity.class); TableUtils.createTableIfNotExists(database.getDataSource(), TrafficJournalEntity.class); TableUtils.createTableIfNotExists(database.getDataSource(), AlertEntity.class); + TableUtils.createTableIfNotExists(database.getDataSource(), FriendEntity.class); + TableUtils.createTableIfNotExists(database.getDataSource(), DHTRecordEntity.class); } private void performUpgrade() throws SQLException { diff --git a/src/main/java/com/ghostchu/peerbanhelper/database/dao/impl/DHTRecordDao.java b/src/main/java/com/ghostchu/peerbanhelper/database/dao/impl/DHTRecordDao.java new file mode 100644 index 000000000..4fc151575 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/database/dao/impl/DHTRecordDao.java @@ -0,0 +1,113 @@ +package com.ghostchu.peerbanhelper.database.dao.impl; + +import com.ghostchu.peerbanhelper.database.Database; +import com.ghostchu.peerbanhelper.database.dao.AbstractPBHDao; +import com.ghostchu.peerbanhelper.database.table.DHTRecordEntity; +import io.ipfs.multibase.binary.Base32; +import io.ipfs.multihash.Multihash; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import org.peergos.protocol.dht.RecordStore; +import org.peergos.protocol.ipns.IpnsRecord; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.sql.SQLException; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.Arrays; +import java.util.Deque; +import java.util.Optional; + +@Component +@Slf4j +public class DHTRecordDao extends AbstractPBHDao implements RecordStore { + private final int SIZE_OF_VAL = 10240; + private final int SIZE_OF_PEERID = 100; + + public DHTRecordDao(@Autowired Database database) throws SQLException { + super(database.getDataSource(), DHTRecordEntity.class); + } + + @Override + public void close() throws Exception { + + } + + @SneakyThrows + public void batchSave(Deque tasks) { + callBatchTasks(() -> { + while (!tasks.isEmpty()) { + var task = tasks.pop(); + try { + if (task.delete()) { + remove(task.key); + } else { + put(task.key, task.value); + } + } catch (Exception e) { + log.warn("Unable save {} to DHT records database", task); + } + } + return null; + }); + } + + private String hashToKey(Multihash hash) { + String padded = new Base32().encodeAsString(hash.toBytes()); + int padStart = padded.indexOf("="); + return padStart > 0 ? padded.substring(0, padStart) : padded; + } + + @Override + public Optional get(Multihash peerId) { + try { + var entity = queryForId(hashToKey(peerId)); + if (entity == null) return Optional.empty(); + return Optional.of( + new IpnsRecord( + entity.getRaw(), + entity.getSequence(), + entity.getTtlNanos(), + LocalDateTime.ofEpochSecond(entity.getExpiryUTC(), 0, ZoneOffset.UTC), + entity.getVal().getBytes() + ) + ); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void put(Multihash peerId, IpnsRecord record) { + try { + createOrUpdate(new DHTRecordEntity( + hashToKey(peerId), + record.raw, + record.sequence, + record.ttlNanos, + record.expiry.toEpochSecond(ZoneOffset.UTC), + new String(record.value.length > SIZE_OF_VAL ? + Arrays.copyOfRange(record.value, 0, SIZE_OF_VAL) : record.value) + )); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + @Override + public void remove(Multihash peerId) { + try { + delete(queryForEq("peerId", hashToKey(peerId))); + } catch (SQLException e) { + throw new IllegalStateException(e); + } + } + + public record PersistTask( + boolean delete, + Multihash key, + IpnsRecord value + ) { + } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/database/dao/impl/FriendDao.java b/src/main/java/com/ghostchu/peerbanhelper/database/dao/impl/FriendDao.java new file mode 100644 index 000000000..8b1e600f1 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/database/dao/impl/FriendDao.java @@ -0,0 +1,38 @@ +package com.ghostchu.peerbanhelper.database.dao.impl; + +import com.ghostchu.peerbanhelper.database.Database; +import com.ghostchu.peerbanhelper.database.dao.AbstractPBHDao; +import com.ghostchu.peerbanhelper.database.table.FriendEntity; +import com.ghostchu.peerbanhelper.friend.Friend; +import com.j256.ormlite.table.TableUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.sql.SQLException; +import java.sql.Timestamp; +import java.util.Set; + +@Component +@Slf4j +public class FriendDao extends AbstractPBHDao { + public FriendDao(@Autowired Database database) throws SQLException { + super(database.getDataSource(), FriendEntity.class); + } + + public void saveFriendList(Set friends) throws SQLException { + callBatchTasks(() -> { + var entities = friends.stream().map(f -> new FriendEntity( + f.getPeerId(), + f.getPubKey(), + new Timestamp(f.getLastAttemptConnectTime()), + new Timestamp(f.getLastCommunicationTime()), + f.getLastRecordedPBHVersion(), + f.getLastRecordedConnectionStatus() + )).toList(); + TableUtils.clearTable(getConnectionSource(), FriendEntity.class); + create(entities); + return null; + }); + } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/database/table/DHTRecordEntity.java b/src/main/java/com/ghostchu/peerbanhelper/database/table/DHTRecordEntity.java new file mode 100644 index 000000000..e3a7bdab2 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/database/table/DHTRecordEntity.java @@ -0,0 +1,27 @@ +package com.ghostchu.peerbanhelper.database.table; + +import com.j256.ormlite.field.DataType; +import com.j256.ormlite.field.DatabaseField; +import com.j256.ormlite.table.DatabaseTable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@AllArgsConstructor +@NoArgsConstructor +@Data +@DatabaseTable(tableName = "dht_records") +public final class DHTRecordEntity { + @DatabaseField(id = true, index = true, canBeNull = false, columnDefinition = "VARCHAR(100)") + private String peerId; + @DatabaseField(canBeNull = false, dataType = DataType.BYTE_ARRAY) + private byte[] raw; + @DatabaseField(canBeNull = false) + private long sequence; + @DatabaseField(canBeNull = false) + private long ttlNanos; + @DatabaseField(canBeNull = false) + private long expiryUTC; + @DatabaseField(canBeNull = false, columnDefinition = "VARCHAR(10240)") + private String val; +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/database/table/FriendEntity.java b/src/main/java/com/ghostchu/peerbanhelper/database/table/FriendEntity.java new file mode 100644 index 000000000..14f7b62b7 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/database/table/FriendEntity.java @@ -0,0 +1,29 @@ +package com.ghostchu.peerbanhelper.database.table; + +import com.j256.ormlite.field.DataType; +import com.j256.ormlite.field.DatabaseField; +import com.j256.ormlite.table.DatabaseTable; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.sql.Timestamp; + +@AllArgsConstructor +@NoArgsConstructor +@Data +@DatabaseTable(tableName = "friend") +public final class FriendEntity { + @DatabaseField(id = true, index = true) + private String peerId; + @DatabaseField(dataType = DataType.BYTE_ARRAY) + private byte[] pubKey; + @DatabaseField(canBeNull = false) + private Timestamp lastAttemptConnectTime; + @DatabaseField(canBeNull = false) + private Timestamp lastCommunicationTime; + @DatabaseField(canBeNull = false) + private String lastRecordedPBHVersion; + @DatabaseField(canBeNull = false) + private String lastRecordedConnectionStatus; +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/IPFS.java b/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/IPFS.java new file mode 100644 index 000000000..2809874fb --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/IPFS.java @@ -0,0 +1,242 @@ +package com.ghostchu.peerbanhelper.decentralized.ipfs; + +import com.ghostchu.peerbanhelper.Main; +import com.ghostchu.peerbanhelper.decentralized.ipfs.impl.HybirdDHTRecordStore; +import com.ghostchu.peerbanhelper.text.Lang; +import com.ghostchu.peerbanhelper.web.JavalinWebContainer; +import com.ghostchu.peerbanhelper.web.Role; +import com.ghostchu.peerbanhelper.web.exception.IPAddressBannedException; +import com.ghostchu.peerbanhelper.web.wrapper.StdResp; +import com.ghostchu.simplereloadlib.ReloadResult; +import com.ghostchu.simplereloadlib.Reloadable; +import io.ipfs.multiaddr.MultiAddress; +import io.javalin.http.Context; +import io.javalin.http.HttpStatus; +import io.libp2p.core.PeerId; +import io.libp2p.core.crypto.PrivKey; +import io.libp2p.core.crypto.PubKey; +import io.libp2p.crypto.keys.Ed25519Kt; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.*; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.peergos.BlockRequestAuthoriser; +import org.peergos.EmbeddedIpfs; +import org.peergos.HostBuilder; +import org.peergos.blockstore.FileBlockstore; +import org.peergos.config.IdentitySection; +import org.peergos.protocol.http.HttpProtocol; +import org.springframework.stereotype.Component; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static com.ghostchu.peerbanhelper.text.TextManager.tlUI; + +@Component +@Slf4j +public class IPFS implements Reloadable { + private static final File dataDirectory = new File(Main.getDataDirectory(), "ipfs"); + private static final File blockStoreDirectory = new File(dataDirectory, "blockstore"); + + static { + if (!dataDirectory.exists()) { + dataDirectory.mkdirs(); + } + if (!blockStoreDirectory.exists()) { + blockStoreDirectory.mkdirs(); + } + } + + private final JavalinWebContainer webContainer; + private final HybirdDHTRecordStore hybirdDHTRecordStore; + + @Getter + private EmbeddedIpfs ipfs; + @Getter + private PrivKey identityEd25519Private; + @Getter + private PubKey identityEd25519Public; + + public IPFS(JavalinWebContainer webContainer, HybirdDHTRecordStore hybirdDHTRecordStore) throws IOException { + this.webContainer = webContainer; + this.hybirdDHTRecordStore = hybirdDHTRecordStore; + Main.getReloadManager().register(this); + Thread.ofVirtual().start(() -> { + try { + init(Main.getMainConfig().getInt("ipfs.port"), false); + } catch (IOException e) { + log.error("Unable to startup IPFS", e); + } + }); + } + + @Override + public ReloadResult reloadModule() throws Exception { + reloadConfig(); + return Reloadable.super.reloadModule(); + } + + private void reloadConfig() throws IOException { + if (!dataDirectory.exists()) { + dataDirectory.mkdirs(); + } + File privateKeyFile = new File(dataDirectory, "private-ed25519.key"); + File publicKeyFile = new File(dataDirectory, "public-ed25519.pub"); + if (!privateKeyFile.exists() || !publicKeyFile.exists()) { + var keyPair = Ed25519Kt.generateEd25519KeyPair(); + Files.write(privateKeyFile.toPath(), keyPair.component1().bytes()); + Files.write(publicKeyFile.toPath(), keyPair.component2().bytes()); + } else { + identityEd25519Private = Ed25519Kt.unmarshalEd25519PrivateKey(Files.readAllBytes(privateKeyFile.toPath())); + identityEd25519Public = Ed25519Kt.unmarshalEd25519PublicKey(Files.readAllBytes(publicKeyFile.toPath())); + } + } + + public void init(int port, boolean isRelayNode) throws IOException { + reloadConfig(); + webContainer.javalin().beforeMatched("/p2p-api", ctx -> { + if (ctx.routeRoles().isEmpty()) { + return; + } + if (ctx.routeRoles().contains(Role.ANYONE)) { + return; + } + if (!webContainer.allowAttemptLogin(ctx.ip())) { + throw new IPAddressBannedException(); + } + var token = ctx.header("X-P2P-LoopBack-WebAPI-Token"); + if (!webContainer.getToken().equals(token)) { + ctx.status(403); + ctx.json(new StdResp(false, "Unable to access P2P loop-back endpoints without valid webapi token (external access?)", null)); + webContainer.markLoginFailed(ctx.ip()); + } + }); + + List swarmAddresses = List.of(new MultiAddress("/ip6/::/tcp/" + port)); + List bootstrapAddresses = List.of( + new MultiAddress("/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa"), + new MultiAddress("/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb"), + new MultiAddress("/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt"), + new MultiAddress("/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), + new MultiAddress("/ip4/25.196.147.100/tcp/4001/p2p/QmaMqSwWShsPg2RbredZtoneFjXhim7AQkqbLxib45Lx4S"), + new MultiAddress("/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), // mars.i.ipfs.io + new MultiAddress("/ip4/104.131.131.82/tcp/4001/ipfs/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ"), + new MultiAddress("/ip4/104.236.179.241/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"), + new MultiAddress("/ip4/128.199.219.111/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu"), + new MultiAddress("/ip4/104.236.76.40/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"), + new MultiAddress("/ip4/178.62.158.247/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd"), + new MultiAddress("/ip6/2604:a880:1:20:0:0:203:d001/tcp/4001/ipfs/QmSoLPppuBtQSGwKDZT2M73ULpjvfd3aZ6ha4oFGL1KrGM"), + new MultiAddress("/ip6/2400:6180:0:d0:0:0:151:6001/tcp/4001/ipfs/QmSoLSafTMBsPKadTEgaXctDQVcqN88CNLHXMkTNwMKPnu"), + new MultiAddress("/ip6/2604:a880:800:10:0:0:4a:5001/tcp/4001/ipfs/QmSoLV4Bbm51jM9C4gDYZQ9Cy3U6aXMJDAbzgu2fzaDs64"), + new MultiAddress("/ip6/2a03:b0c0:0:1010:0:0:23:1001/tcp/4001/ipfs/QmSoLer265NRgSp2LA3dPaeykiS1J6DifTC88f5uVQKNAd") + ); + BlockRequestAuthoriser authoriser = (cid, peerid, auth) -> CompletableFuture.completedFuture(true); + HostBuilder builder = new HostBuilder().setPrivKey(identityEd25519Private); + PrivKey privKey = builder.getPrivateKey(); + PeerId peerId = builder.getPeerId(); + IdentitySection identity = new IdentitySection(privKey.bytes(), peerId); + boolean provideBlocks = true; + EmbeddedIpfs ipfs = EmbeddedIpfs.build( + hybirdDHTRecordStore, + new FileBlockstore(blockStoreDirectory.toPath()), + provideBlocks, + swarmAddresses, + bootstrapAddresses, + identity, + authoriser, + Optional.of(proxyHandler()) + ); + log.info(tlUI(Lang.IPFS_STARTING)); + ipfs.start(); + log.info(tlUI(Lang.IPFS_STARTED)); + this.ipfs = ipfs; + if (isRelayNode) { + PBHRelay.advertise(ipfs.dht, ipfs.node); + log.info(tlUI(Lang.IPFS_ADVERTISE_RELAY)); + } + } + + private void initJavalinEndpoints() { + webContainer.javalin() + .get("/p2p-api/heartbeat", ctx -> ctx.status(HttpStatus.NO_CONTENT)) + .get("/p2p-api/manifest", this::manifest) + .post("/p2p-api/keypair/sign", this::sign); + } + + private void sign(Context context) { + context.result(new String(identityEd25519Private.sign(context.bodyAsBytes()))); + } + + private void manifest(Context context) { + context.json(new IPFS.Manifest(Main.getMeta().getVersion())); + } + + + private HttpProtocol.HttpRequestProcessor proxyHandler() { + return (s, req, h) -> { + var host = webContainer.getHost(); + if (host.equals("0.0.0.0")) { + host = "127.0.0.1"; + } + if (host.equals("::") || host.equals("[::]")) { + host = "::1"; + } + SocketAddress httpTarget = new InetSocketAddress(host, webContainer.javalin().port()); + if (!req.uri().startsWith("/p2p-api/")) { + var ct = "Access denied - P2P can only access /p2p-api/ endpoints"; + FullHttpResponse replyOk = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.FORBIDDEN, Unpooled.wrappedBuffer(ct.getBytes(StandardCharsets.UTF_8))); + replyOk.headers().set(HttpHeaderNames.CONTENT_LENGTH, ct.length()); + h.accept(replyOk.retain()); + return; + } + req.headers().add("X-P2P-PeerID", s.remotePeerId().toBase58()); + req.headers().add("X-P2P-LoopBack-WebAPI-Token", webContainer.getToken()); + HttpProtocol.proxyRequest(req, httpTarget, h); + }; + } + + public record Manifest( + String version + ) { + + } +// +// public void checkRelays(PrivKey identityEd25519) { +// relayListOperationLock.lock(); +// try { +// var newRelays = PBHRelay.findRelays(ipfs.dht, ipfs.node); +// if (relays.isEmpty()) { +// return; +// } +// relays.clear(); +// relays.addAll(newRelays);; +// } finally { +// relays.forEach(pa->); +// relayListOperationLock.unlock(); +// } +// } +// +// public void reserveOnRelay(PrivKey identityEd25519, PeerAddresses relay){ +// HostBuilder builder = new HostBuilder().setPrivKey(identityEd25519); +// Multiaddr relayAddr = Multiaddr.fromString(relay.getPublicAddresses().get(0).toString()) +// .withP2P(PeerId.fromBase58(relay.peerId.toBase58())); +// CircuitHopProtocol.HopController hop = builder.getRelayHop().get().dial(ipfs.node, relayAddr).getController().join(); +// // ??? +// hop.reserve(); +// } +// +// public byte[] getData(Cid cid, @Nullable Set retrieveFrom, boolean persist) { +// List wants = List.of(new Want(cid)); +// boolean addToLocal = true; +// List blocks = ipfs.getBlocks(wants, retrieveFrom, addToLocal); +// byte[] data = blocks.get(0).block; +// } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/PBHRelay.java b/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/PBHRelay.java new file mode 100644 index 000000000..fb6fb79c3 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/PBHRelay.java @@ -0,0 +1,29 @@ +package com.ghostchu.peerbanhelper.decentralized.ipfs; + +import io.ipfs.multihash.Multihash; +import io.libp2p.core.Host; +import org.peergos.Hash; +import org.peergos.PeerAddresses; +import org.peergos.protocol.dht.Kademlia; + +import java.nio.charset.StandardCharsets; +import java.util.List; + +public class PBHRelay { + public static final Multihash RELAY_RENDEZVOUS_NAMESPACE; + + static { + RELAY_RENDEZVOUS_NAMESPACE = new Multihash(Multihash.Type.sha2_256, Hash.sha256("/peerbanhelper/relay".getBytes(StandardCharsets.UTF_8))); + } + + public PBHRelay() { + } + + public static void advertise(Kademlia dht, Host us) { + dht.provideBlock(RELAY_RENDEZVOUS_NAMESPACE, us, PeerAddresses.fromHost(us)).join(); + } + + public static List findRelays(Kademlia dht, Host us) { + return dht.findProviders(RELAY_RENDEZVOUS_NAMESPACE, us, 20).join(); + } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/impl/HybirdDHTRecordStore.java b/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/impl/HybirdDHTRecordStore.java new file mode 100644 index 000000000..927ebc9a7 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/decentralized/ipfs/impl/HybirdDHTRecordStore.java @@ -0,0 +1,78 @@ +package com.ghostchu.peerbanhelper.decentralized.ipfs.impl; + +import com.ghostchu.peerbanhelper.database.dao.impl.DHTRecordDao; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalCause; +import io.ipfs.multihash.Multihash; +import org.peergos.protocol.dht.RecordStore; +import org.peergos.protocol.ipns.IpnsRecord; +import org.springframework.stereotype.Component; + +import java.util.Deque; +import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +@Component +public class HybirdDHTRecordStore implements RecordStore { + private final Deque persistTasks = new ConcurrentLinkedDeque<>(); + private final Cache records = CacheBuilder + .newBuilder() + .expireAfterAccess(30, TimeUnit.MINUTES) + .maximumSize(200) + .removalListener(notification -> { + if (notification.getCause() != RemovalCause.EXPLICIT) { + Multihash key = (Multihash) notification.getKey(); + IpnsRecord value = (IpnsRecord) notification.getValue(); + persistTasks.offer(new DHTRecordDao.PersistTask(false, key, value)); + } + }) + .build(); + private final DHTRecordDao dhtRecordDao; + private final ScheduledExecutorService scheduled; + + public HybirdDHTRecordStore(DHTRecordDao dhtRecordDao) { + this.dhtRecordDao = dhtRecordDao; + this.scheduled = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); + this.scheduled.scheduleWithFixedDelay(this::flush, 5, 5, TimeUnit.MINUTES); + } + + @Override + public void put(Multihash multihash, IpnsRecord ipnsRecord) { + records.put(multihash, ipnsRecord); + } + + @Override + public Optional get(Multihash multihash) { + var record = records.getIfPresent(multihash); + if (record == null) { + record = dhtRecordDao.get(multihash).orElse(null); + } + if (record != null) { + records.put(multihash, record); + } + return Optional.ofNullable(record); + } + + @Override + public void remove(Multihash multihash) { + //dhtRecordDao.remove(multihash); + persistTasks.offer(new DHTRecordDao.PersistTask(true, multihash, null)); + records.invalidate(multihash); + } + + private void flush() { + dhtRecordDao.batchSave(persistTasks); + } + + @Override + public void close() throws Exception { + scheduled.shutdownNow(); + records.asMap().forEach((hash, record) -> persistTasks.offer(new DHTRecordDao.PersistTask(false, hash, record))); + records.invalidateAll(); + flush(); + } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/friend/Friend.java b/src/main/java/com/ghostchu/peerbanhelper/friend/Friend.java new file mode 100644 index 000000000..cfb7f437b --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/friend/Friend.java @@ -0,0 +1,123 @@ +package com.ghostchu.peerbanhelper.friend; + +import com.ghostchu.peerbanhelper.Main; +import com.ghostchu.peerbanhelper.decentralized.ipfs.IPFS; +import com.ghostchu.peerbanhelper.util.json.JsonUtil; +import io.ipfs.multihash.Multihash; +import io.libp2p.core.PeerId; +import io.libp2p.crypto.keys.Ed25519Kt; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.*; +import lombok.Getter; +import org.peergos.EmbeddedIpfs; +import org.peergos.protocol.http.HttpProtocol; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.UUID; + +public class Friend { + @Getter + private final String peerId; + @Getter + private final byte[] pubKey; + private final transient IPFS ipfs; + @Getter + private long lastAttemptConnectTime; + @Getter + private long lastCommunicationTime; + @Getter + private String lastRecordedPBHVersion; + @Getter + private transient boolean connected; + @Getter + private String lastRecordedConnectionStatus; + private transient HttpProtocol.HttpController controller; + + public Friend(IPFS ipfs, String peerId, byte[] pubKey, long lastAttemptConnectTime, long lastCommunicationTime, String lastRecordedPBHVersion) { + this.ipfs = ipfs; + this.pubKey = pubKey; + this.peerId = peerId; + this.lastAttemptConnectTime = lastAttemptConnectTime; + this.lastCommunicationTime = lastCommunicationTime; + this.lastRecordedPBHVersion = lastRecordedPBHVersion; + } + + public HttpProtocol.HttpController connectAndGetController() { + if (!connect()) { + return null; + } + return controller; + } + + public boolean connect() { + if (controller != null) { + FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/p2p-api/heartbeat"); + httpRequest.headers().add("User-Agent", Main.getUserAgent()); + if (controller.send(httpRequest).join().status() == HttpResponseStatus.NO_CONTENT) { + return connectSuccess("Connection valid", controller); + } + } + lastAttemptConnectTime = System.currentTimeMillis(); + var ipfs = this.ipfs.getIpfs(); + if (ipfs == null) { + return connectFailed("IPFS component not ready, try again later."); + } + try { + var addrs = EmbeddedIpfs.getAddresses(ipfs.node, ipfs.dht, Multihash.fromBase58(peerId)); + HttpProtocol.HttpController proxier = ipfs.p2pHttp.get().dial(ipfs.node, PeerId.fromBase58(peerId), addrs).getController().join(); + byte[] proofData = UUID.randomUUID().toString().getBytes(StandardCharsets.ISO_8859_1); + FullHttpRequest peerTrustTest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "/p2p-api/keypair/sign", Unpooled.wrappedBuffer(proofData)); + peerTrustTest.headers().set(HttpHeaderNames.CONTENT_LENGTH, proofData.length); + if (!Ed25519Kt.unmarshalEd25519PublicKey(pubKey).verify(proofData, extractString(proxier.send(peerTrustTest).join().content()).getBytes(StandardCharsets.ISO_8859_1))) { + return connectFailed("Signature verify failed, MITM?"); + } + FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/manifest"); + var response = proxier.send(httpRequest).join(); + if (response.status() == HttpResponseStatus.OK) { + var manifest = JsonUtil.standard().fromJson(response.content().toString(), IPFS.Manifest.class); + lastRecordedPBHVersion = manifest.version(); + return connectSuccess("Connected", proxier); + } else { + return connectFailed("Connected, but no excepted response received."); + } + } catch (Exception e) { + return connectFailed(e.getClass().getName() + ": " + e.getMessage()); + } + } + + private String extractString(ByteBuf buf) { + return buf.getCharSequence(0, buf.readableBytes(), StandardCharsets.ISO_8859_1).toString(); + } + + private boolean connectSuccess(String reason, HttpProtocol.HttpController controller) { + lastRecordedConnectionStatus = reason; + lastCommunicationTime = System.currentTimeMillis(); + connected = true; + this.controller = controller; + System.out.println("Connected: " + reason); + return true; + } + + private boolean connectFailed(String reason) { + lastRecordedConnectionStatus = reason; + connected = false; + System.out.println("Connect failed: " + reason); + return false; + } + + @Override + public boolean equals(Object object) { + if (this == object) return true; + if (object == null || getClass() != object.getClass()) return false; + Friend friend = (Friend) object; + return Objects.equals(peerId, friend.peerId); + } + + @Override + public int hashCode() { + return Objects.hashCode(peerId); + } + +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/friend/FriendManager.java b/src/main/java/com/ghostchu/peerbanhelper/friend/FriendManager.java new file mode 100644 index 000000000..ca6bba344 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/friend/FriendManager.java @@ -0,0 +1,75 @@ +package com.ghostchu.peerbanhelper.friend; + +import com.ghostchu.peerbanhelper.database.dao.impl.FriendDao; +import com.ghostchu.peerbanhelper.decentralized.ipfs.IPFS; +import com.ghostchu.peerbanhelper.text.Lang; +import com.ghostchu.peerbanhelper.web.JavalinWebContainer; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import java.sql.SQLException; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static com.ghostchu.peerbanhelper.text.TextManager.tlUI; + +@Component +@Slf4j +public class FriendManager implements AutoCloseable { + @Getter + private final Set friends = new CopyOnWriteArraySet<>(); + private final ScheduledExecutorService reconnectExecutor = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); + private final ScheduledExecutorService saveExecutor = Executors.newScheduledThreadPool(1, Thread.ofVirtual().factory()); + private final JavalinWebContainer webContainer; + private FriendDao friendDao; + private IPFS ipfs; + + public FriendManager(FriendDao friendDao, IPFS ipfs, JavalinWebContainer webContainer) throws SQLException { + this.friendDao = friendDao; + this.ipfs = ipfs; + this.webContainer = webContainer; + var friendsInDb = friendDao.queryForAll(); + int ct = 0; + for (var f : friendsInDb) { + friends.add(new Friend( + ipfs, + f.getPeerId(), + f.getPubKey(), + f.getLastAttemptConnectTime().getTime(), + f.getLastCommunicationTime().getTime(), + f.getLastRecordedPBHVersion() + )); + ct++; + } + log.info(tlUI(Lang.FRIEND_LOADED, ct)); + reconnectExecutor.scheduleWithFixedDelay(this::reconnect, 0, 5, TimeUnit.MINUTES); + saveExecutor.scheduleWithFixedDelay(this::flush, 0, 10, TimeUnit.MINUTES); + + } + + public void flush() { + try { + friendDao.saveFriendList(friends); + } catch (Exception e) { + log.error("Failed to save friends status to database", e); + } + } + + public void close() { + flush(); + } + + private void reconnect() { + try (var vt = Executors.newVirtualThreadPerTaskExecutor()) { + for (Friend friend : friends) { + vt.submit(() -> { + friend.connect(); + }); + } + } + } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/module/impl/webapi/PBHFriendController.java b/src/main/java/com/ghostchu/peerbanhelper/module/impl/webapi/PBHFriendController.java new file mode 100644 index 000000000..ed560c217 --- /dev/null +++ b/src/main/java/com/ghostchu/peerbanhelper/module/impl/webapi/PBHFriendController.java @@ -0,0 +1,107 @@ +package com.ghostchu.peerbanhelper.module.impl.webapi; + +import com.ghostchu.peerbanhelper.database.dao.impl.HistoryDao; +import com.ghostchu.peerbanhelper.database.dao.impl.PeerRecordDao; +import com.ghostchu.peerbanhelper.database.dao.impl.TorrentDao; +import com.ghostchu.peerbanhelper.decentralized.ipfs.IPFS; +import com.ghostchu.peerbanhelper.friend.FriendManager; +import com.ghostchu.peerbanhelper.module.AbstractFeatureModule; +import com.ghostchu.peerbanhelper.util.context.IgnoreScan; +import com.ghostchu.peerbanhelper.web.JavalinWebContainer; +import com.ghostchu.peerbanhelper.web.Role; +import com.ghostchu.peerbanhelper.web.wrapper.StdResp; +import io.javalin.http.Context; +import lombok.extern.slf4j.Slf4j; +import org.jetbrains.annotations.NotNull; +import org.springframework.stereotype.Component; + +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; + +@Component +@IgnoreScan +@Slf4j +public class PBHFriendController extends AbstractFeatureModule { + private final JavalinWebContainer javalinWebContainer; + private final TorrentDao torrentDao; + private final PeerRecordDao peerRecordDao; + private final HistoryDao historyDao; + private final FriendManager friendManager; + private final IPFS ipfs; + + + public PBHFriendController(JavalinWebContainer javalinWebContainer, TorrentDao torrentDao, PeerRecordDao peerRecordDao, HistoryDao historyDao, FriendManager friendManager, IPFS ipfs) { + this.javalinWebContainer = javalinWebContainer; + this.torrentDao = torrentDao; + this.historyDao = historyDao; + this.peerRecordDao = peerRecordDao; + this.friendManager = friendManager; + this.ipfs = ipfs; + } + + @Override + public boolean isConfigurable() { + return false; + } + + @Override + public @NotNull String getName() { + return "Friend Controller"; + } + + @Override + public @NotNull String getConfigName() { + return "friend-controller"; + } + + @Override + public void onEnable() { + javalinWebContainer + .javalin() + //.get("/api/torrent", this::handleTorrentQuery, Role.USER_READ) + .get("/api/friend/list", this::handleFriendList, Role.USER_READ) + .get("/api/friend/metadata", this::handleFriendMetadata, Role.USER_READ); + } + + private void handleFriendMetadata(Context context) { + Map meta = new HashMap<>(); + if (ipfs.getIpfs() == null) { + context.json(new StdResp(false, "IPFS starting up", null)); + return; + } + meta.put("publicKey", Base64.getEncoder().encodeToString(ipfs.getIdentityEd25519Public().bytes())); + meta.put("peerId", ipfs.getIpfs().node.getPeerId().toHex()); + context.json(new StdResp(true, null, meta)); + } + + private void handleFriendList(Context context) { + var list = friendManager.getFriends().stream().map( + friend -> new FriendDto(friend.getPeerId(), + Base64.getEncoder().encodeToString(friend.getPubKey()), + friend.getLastAttemptConnectTime(), + friend.getLastCommunicationTime(), + friend.getLastRecordedPBHVersion(), + friend.isConnected(), + friend.getLastRecordedConnectionStatus()) + ).toList(); + context.json(new StdResp(true, null, list)); + } + + @Override + public void onDisable() { + + } + + public record FriendDto( + String peerId, + String pubKey, + long lastAttemptConnectTime, + long lastCommunicationTime, + String lastRecordedPBHVersion, + boolean connected, + String lastRecordedConnectionStatus + ) { + + } +} diff --git a/src/main/java/com/ghostchu/peerbanhelper/text/Lang.java b/src/main/java/com/ghostchu/peerbanhelper/text/Lang.java index ef9131a76..c342525b6 100644 --- a/src/main/java/com/ghostchu/peerbanhelper/text/Lang.java +++ b/src/main/java/com/ghostchu/peerbanhelper/text/Lang.java @@ -386,6 +386,10 @@ public enum Lang { BTN_RECONFIGURE_DISABLED_BY_SERVER, BTN_RECONFIGURE_PREPARE_RECONFIGURE, UNABLE_SET_SQLITE_OPTIMIZED_PRAGMA, + IPFS_STARTING, + IPFS_ADVERTISE_RELAY, + IPFS_STARTED, + FRIEND_LOADED; THREAD_INTERRUPTED, DOWNLOADER_BITCOMET_UNABLE_FETCH_TASK_SUMMARY, ALERT_SNAPSHOT, diff --git a/src/main/java/com/ghostchu/peerbanhelper/web/JavalinWebContainer.java b/src/main/java/com/ghostchu/peerbanhelper/web/JavalinWebContainer.java index 01545fbed..6177646d7 100644 --- a/src/main/java/com/ghostchu/peerbanhelper/web/JavalinWebContainer.java +++ b/src/main/java/com/ghostchu/peerbanhelper/web/JavalinWebContainer.java @@ -44,6 +44,8 @@ public class JavalinWebContainer { private Cache FAIL2BAN = CacheBuilder.newBuilder() .expireAfterWrite(15, TimeUnit.MINUTES) .build(); + @Getter + private String host; public JavalinWebContainer(ActivationManager activationManager) { JsonMapper gsonMapper = new JsonMapper() { @@ -169,6 +171,7 @@ public boolean isContextAuthorized(Context ctx) { } public void start(String host, int port, String token) { + this.host = host; this.token = token; javalin.start(host, port); } diff --git a/src/main/resources/config.yml b/src/main/resources/config.yml index ef88b4132..d3d63d9fd 100644 --- a/src/main/resources/config.yml +++ b/src/main/resources/config.yml @@ -150,7 +150,8 @@ performance: # Enable EcoQoS API on Windows Platform for power saving, for exchange, the program performance will reduce and cronjobs may delay # https://devblogs.microsoft.com/performance-diagnostics/introducing-ecoqos/ windows-ecoqos-api: true - +ipfs: + port: 9899 push-notification: # 邮件推送服务 - Email push notification email-example: diff --git a/src/main/resources/lang/en_us/messages.yml b/src/main/resources/lang/en_us/messages.yml index 1773d61f2..49be159b6 100644 --- a/src/main/resources/lang/en_us/messages.yml +++ b/src/main/resources/lang/en_us/messages.yml @@ -417,6 +417,10 @@ BTN_STAND_BY: "Ready" BTN_RECONFIGURE_DISABLED_BY_SERVER: "BTN server disabled reconfigure" BTN_RECONFIGURE_PREPARE_RECONFIGURE: "Detected new version of configuration, reconfiguring..." UNABLE_SET_SQLITE_OPTIMIZED_PRAGMA: "Failed to set optimized SQLite PRAGMA arguments" +IPFS_STARTING: "[IPFS] PeerBanHelper connecting to IPFS, please wait..." +IPFS_STARTED: "[IPFS] IPFS connected, decentralized features now enabled" +FRIEND_LOADED: "[Friend] Loaded {} friends" +IPFS_ADVERTISE_RELAY: "[IPFS] PeerBanHelper announced this node as a Relay node, you will be help other users routing the traffic on this IPFS network" THREAD_INTERRUPTED: "Thread interrupted, operation cancelled" DOWNLOADER_BITCOMET_UNABLE_FETCH_TASK_SUMMARY: "Cannot retrieve BitComet task details" ALERT_SNAPSHOT: "You are currently using a non-stable version of PeerBanHelper" @@ -473,4 +477,4 @@ PROGRAM_OUT_OF_MEMORY_DESCRIPTION: | Maximum memory: {}MB Please increase PeerBanHelper's memory allocation immediately. -BAN_PEER_EXCEPTION: "An exception occurred while ban Peer, specific Peer unable to ban, Please report these information to PeerBanHelper developer" \ No newline at end of file +BAN_PEER_EXCEPTION: "An exception occurred while ban Peer, specific Peer unable to ban, Please report these information to PeerBanHelper developer" diff --git a/src/main/resources/lang/messages_fallback.yml b/src/main/resources/lang/messages_fallback.yml index 25d590f34..f3c88e52f 100644 --- a/src/main/resources/lang/messages_fallback.yml +++ b/src/main/resources/lang/messages_fallback.yml @@ -416,6 +416,10 @@ BTN_STAND_BY: "就绪" BTN_RECONFIGURE_DISABLED_BY_SERVER: "BTN 服务器已禁用重新配置" BTN_RECONFIGURE_PREPARE_RECONFIGURE: "检测到新的版本,正在重新配置" UNABLE_SET_SQLITE_OPTIMIZED_PRAGMA: "设置 SQLite 优化的 PRAGMA 参数时出现错误" +IPFS_STARTING: "[IPFS] PeerBanHelper 正在连接到 IPFS 网络,请稍等" +IPFS_STARTED: "[IPFS] PeerBanHelper 已连接到 IPFS 网络,去中心化网络功能已启用" +FRIEND_LOADED: "[好友] 已加载 {} 个好友" +IPFS_ADVERTISE_RELAY: "[IPFS] PeerBanHelper 已宣告此 IPFS 节点为网络中继节点 (Relay),您将帮助 IPFS 网络上的其它用户中继流量" THREAD_INTERRUPTED: "线程强制终止,操作已被取消" DOWNLOADER_BITCOMET_UNABLE_FETCH_TASK_SUMMARY: "无法获取 BitComet 任务详情信息" ALERT_SNAPSHOT: "您当前正在使用的是 PeerBanHelper 的非稳定版本" @@ -472,4 +476,4 @@ PROGRAM_OUT_OF_MEMORY_DESCRIPTION: | 最大内存: {}MB 请立即增加 PeerBanHelper 的内存分配。 -BAN_PEER_EXCEPTION: "封禁 Peer 时出现意外错误,指定 Peer 可能封禁失败,请将下面的信息报告给 PeerBanHelper 开发者" \ No newline at end of file +BAN_PEER_EXCEPTION: "封禁 Peer 时出现意外错误,指定 Peer 可能封禁失败,请将下面的信息报告给 PeerBanHelper 开发者" diff --git a/src/main/resources/lang/zh_cn/messages.yml b/src/main/resources/lang/zh_cn/messages.yml index a8c61eb23..8ad8660e7 100644 --- a/src/main/resources/lang/zh_cn/messages.yml +++ b/src/main/resources/lang/zh_cn/messages.yml @@ -415,6 +415,10 @@ BTN_STAND_BY: "就绪" BTN_RECONFIGURE_DISABLED_BY_SERVER: "BTN 服务器已禁用重新配置" BTN_RECONFIGURE_PREPARE_RECONFIGURE: "检测到新的版本,正在重新配置" UNABLE_SET_SQLITE_OPTIMIZED_PRAGMA: "设置 SQLite 优化的 PRAGMA 参数时出现错误" +IPFS_STARTING: "[IPFS] PeerBanHelper 正在连接到 IPFS 网络,请稍等" +IPFS_STARTED: "[IPFS] PeerBanHelper 已连接到 IPFS 网络,去中心化网络功能已启用" +FRIEND_LOADED: "[好友] 已加载 {} 个好友" +IPFS_ADVERTISE_RELAY: "[IPFS] PeerBanHelper 已宣告此 IPFS 节点为网络中继节点 (Relay),您将帮助 IPFS 网络上的其它用户中继流量" THREAD_INTERRUPTED: "线程强制终止,操作已被取消" DOWNLOADER_BITCOMET_UNABLE_FETCH_TASK_SUMMARY: "无法获取 BitComet 任务详情信息" ALERT_SNAPSHOT: "您当前正在使用的是 PeerBanHelper 的非稳定版本" @@ -471,4 +475,4 @@ PROGRAM_OUT_OF_MEMORY_DESCRIPTION: | 最大内存: {}MB 请立即增加 PeerBanHelper 的内存分配。 -BAN_PEER_EXCEPTION: "封禁 Peer 时出现意外错误,指定 Peer 可能封禁失败,请将下面的信息报告给 PeerBanHelper 开发者" \ No newline at end of file +BAN_PEER_EXCEPTION: "封禁 Peer 时出现意外错误,指定 Peer 可能封禁失败,请将下面的信息报告给 PeerBanHelper 开发者"