diff --git a/build.gradle b/build.gradle index 40c0baf6f36b971623f182f6c5547a2f58f74e92..03129fdf56927a4857754835bd789ac50fc27ce7 100644 --- a/build.gradle +++ b/build.gradle @@ -104,7 +104,7 @@ List lombok = [ allprojects { group "com.webank.weevent" - version "1.5.0" + version "1.6.0" apply plugin: "java-library" apply plugin: "maven" diff --git a/weevent-broker/src/main/java/com/webank/weevent/broker/db/InitialDb.java b/weevent-broker/src/main/java/com/webank/weevent/broker/db/InitialDb.java index 6a9ad8b01bcebf62f9a2d76d174a98fe12cd9b9d..cc754749fbe47807a6eaa8f43e94991c907f42f2 100644 --- a/weevent-broker/src/main/java/com/webank/weevent/broker/db/InitialDb.java +++ b/weevent-broker/src/main/java/com/webank/weevent/broker/db/InitialDb.java @@ -47,13 +47,15 @@ public class InitialDb implements AutoCloseable { if (flag) { databaseType = "mysql"; } - // first use dbself database - int first = goalUrl.lastIndexOf("/"); - int end = goalUrl.lastIndexOf("?"); - this.dbName = flag ? goalUrl.substring(first + 1, end) : goalUrl.substring(first + 1); + int first = goalUrl.lastIndexOf("/") + 1; + int endTag = goalUrl.indexOf("?"); + if (endTag == -1) { + endTag = goalUrl.length(); + } + this.dbName = goalUrl.substring(first, endTag); // get mysql default url like jdbc:mysql://127.0.0.1:3306 String defaultUrl = flag ? goalUrl.substring(0, first) : goalUrl; - + log.info("dbName:{},defaultUrl:{}, {}", this.dbName, defaultUrl, databaseType); Class.forName(driverName); List tableSqlList = readSql(); diff --git a/weevent-broker/src/main/resources/application-dev.properties b/weevent-broker/src/main/resources/application-dev.properties index 207f91be5ccf40917a026587923826c162af318d..5d8f8dcbdb649f2b99bf9a4a91b996727ce070f6 100644 --- a/weevent-broker/src/main/resources/application-dev.properties +++ b/weevent-broker/src/main/resources/application-dev.properties @@ -20,6 +20,14 @@ spring.pid.fail-on-write-error=true spring.pid.file=./logs/${spring.application.name}.pid spring.resources.add-mappings=false +#mysql database config +#spring.jpa.database=mysql +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_broker?useUnicode=true&characterEncoding=utf-8&useSSL=false +#spring.datasource.driver-class-name=org.mariadb.jdbc.Driver +#spring.datasource.username=xxxx +#spring.datasource.password=yyyy +#h2 database config +spring.jpa.database=h2 spring.datasource.url=jdbc:h2:./WeEvent_broker spring.datasource.driver-class-name=org.h2.Driver spring.datasource.username=root diff --git a/weevent-broker/src/main/resources/application-prod.properties b/weevent-broker/src/main/resources/application-prod.properties index 207f91be5ccf40917a026587923826c162af318d..5d8f8dcbdb649f2b99bf9a4a91b996727ce070f6 100644 --- a/weevent-broker/src/main/resources/application-prod.properties +++ b/weevent-broker/src/main/resources/application-prod.properties @@ -20,6 +20,14 @@ spring.pid.fail-on-write-error=true spring.pid.file=./logs/${spring.application.name}.pid spring.resources.add-mappings=false +#mysql database config +#spring.jpa.database=mysql +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_broker?useUnicode=true&characterEncoding=utf-8&useSSL=false +#spring.datasource.driver-class-name=org.mariadb.jdbc.Driver +#spring.datasource.username=xxxx +#spring.datasource.password=yyyy +#h2 database config +spring.jpa.database=h2 spring.datasource.url=jdbc:h2:./WeEvent_broker spring.datasource.driver-class-name=org.h2.Driver spring.datasource.username=root diff --git a/weevent-broker/src/main/resources/fisco.yml b/weevent-broker/src/main/resources/fisco.yml index 6d1c9164b027f2ad19369b0b82cc3b7dead6f215..79b5ca1b42bd255e582db39063ceff664e5adf94 100644 --- a/weevent-broker/src/main/resources/fisco.yml +++ b/weevent-broker/src/main/resources/fisco.yml @@ -7,8 +7,6 @@ timeout: 10000 poolSize: 10 maxPoolSize: 200 keepAliveSeconds: 10 -web3sdkEncryptType: ECDSA_TYPE -#web3sdkEncryptType: SM_TYPE consumerHistoryMergeBlock: 8 consumerIdleTime: 1000 diff --git a/weevent-broker/src/main/resources/weevent.properties b/weevent-broker/src/main/resources/weevent.properties index 8492118122bdd82b6ca5cfa9af464d2a386e437b..0384c6dada7b7bddf0a3696b75494ef67f20ee9e 100644 --- a/weevent-broker/src/main/resources/weevent.properties +++ b/weevent-broker/src/main/resources/weevent.properties @@ -10,7 +10,7 @@ stomp.heartbeats=30 mqtt.broker.tcp.port=7001 mqtt.broker.keepalive=60 mqtt.broker.security.ssl=false -mqtt.broker.security.ssl.client_auth=true +mqtt.broker.security.ssl.client_auth=false mqtt.broker.security.ssl.ca_cert:mqtt_ssl/cacert.pem mqtt.broker.security.ssl.server_cert:mqtt_ssl/cert.pem mqtt.broker.security.ssl.server_key:mqtt_ssl/key.pem \ No newline at end of file diff --git a/weevent-build/config.properties b/weevent-build/config.properties index 636e170a06a709e2fdcf7e99c3f4bd0b64cc4922..8fa6e1ff6132c395482cc07e37154f681effce4f 100644 --- a/weevent-build/config.properties +++ b/weevent-build/config.properties @@ -4,8 +4,6 @@ JAVA_HOME=/usr/local/jdk1.8.0_191 # Required module fisco-bcos # support 2.x fisco-bcos.version=2.0 -# FISCO-BCOS node channel, eg: 127.0.0.1:20200;127.0.0.2:20200 -fisco-bcos.channel=127.0.0.1:20200 # The path of FISCO-BCOS 2.0 that contain certificate file ca.crt/sdk.crt/sdk.key fisco-bcos.node_path=~/fisco/nodes/127.0.0.1/sdk/ diff --git a/weevent-build/install-all.sh b/weevent-build/install-all.sh index eee6e0ca56973c3bca7e389797d861baa6bf6024..2c82427349232c106e37678761f5d23e28e69f2a 100644 --- a/weevent-build/install-all.sh +++ b/weevent-build/install-all.sh @@ -13,7 +13,6 @@ governance_port=7009 processor_port=7008 zookeeper_connect_string= block_chain_version= -block_chain_channel= block_chain_node_path= database_type= mysql_ip= @@ -23,6 +22,8 @@ mysql_password= current_path=$PWD +fisco_config_file=$PWD/fisco.yml + function yellow_echo (){ local what=$* echo -e "\e[1;33m${what} \e[0m" @@ -49,7 +50,6 @@ function properties_get(){ function set_global_param(){ java_home_path=$(properties_get "JAVA_HOME") block_chain_version=$(properties_get "fisco-bcos.version") - block_chain_channel=$(properties_get "fisco-bcos.channel") block_chain_node_path=$(properties_get "fisco-bcos.node_path") if [[ "${block_chain_node_path:0:1}" == "~" ]];then block_chain_node_path=$(realpath -m ${HOME}/${block_chain_node_path:1}) @@ -126,7 +126,6 @@ function check_param(){ check_port $(echo ${zookeeper_connect_string} | cut -d ":" -f2) fi if [[ -d ${block_chain_node_path} ]]; then - check_telnet ${block_chain_channel} echo "param ok" else echo "path not exist, ${block_chain_node_path}" @@ -160,7 +159,7 @@ function install_module(){ yellow_echo "install module broker" cd ${current_path}/modules/broker - ./install-broker.sh --out_path ${out_path}/broker --listen_port ${broker_port} --block_chain_node_path ${block_chain_node_path} --channel_info ${block_chain_channel} --version ${block_chain_version} --zookeeper_connect_string ${zookeeper_connect_string} + ./install-broker.sh --out_path ${out_path}/broker --listen_port ${broker_port} --block_chain_node_path ${block_chain_node_path} --version ${block_chain_version} --zookeeper_connect_string ${zookeeper_connect_string} --fisco_config_file ${fisco_config_file} check_result "install broker" if [[ ${governance_enable} = "true" ]];then diff --git a/weevent-build/modules/broker/install-broker.sh b/weevent-build/modules/broker/install-broker.sh index 01e41613a680d08b02b4dd73c888575f4fb44ffa..b53c9aa1631443ff2e2f474752a001879e285b11 100644 --- a/weevent-build/modules/broker/install-broker.sh +++ b/weevent-build/modules/broker/install-broker.sh @@ -8,9 +8,9 @@ while [[ $# -ge 2 ]] ; do --out_path) para="$1 = $2;";out_path="$2";shift 2;; --listen_port) para="$1 = $2;";listen_port="$2";shift 2;; --block_chain_node_path) para="$1 = $2;";block_chain_node_path="$2";shift 2;; - --channel_info) para="$1 = $2;";channel_info="$2";shift 2;; --version) para="$1 = $2;";version="$2";shift 2;; --zookeeper_connect_string) para="$1 = $2;";zookeeper_connect_string="$2";shift 2;; + --fisco_config_file) para="$1 = $2;";fisco_config_file="$2";shift 2;; *) echo "unknown parameter $1." ; exit 1 ; break;; esac done @@ -19,33 +19,38 @@ echo "param out_path: ${out_path}" echo "param listen_port: ${listen_port}" echo "param version: ${version}" echo "param block_chain_node_path: ${block_chain_node_path}" -echo "param channel_info: ${channel_info}" echo "param zookeeper_connect_string: ${zookeeper_connect_string}" +echo "param fisco_config_file: ${fisco_config_file}" + #copy file function copy_file(){ mkdir -p ${out_path} cp -r ./* ${out_path}/ + cp ${fisco_config_file} ${out_path}/conf rm -f ${out_path}/install-broker.sh } -if [[ -z ${channel_info} ]];then - echo "channel_info is empty." - exit 1 -fi - copy_file -sed -i "s/^.*nodes=.*$/nodes=${channel_info}/g" ${out_path}/conf/fisco.properties + echo "set channel_info success" -if [[ -f ${block_chain_node_path}/ca.crt ]] && [[ -f ${block_chain_node_path}/sdk.crt ]] && [[ -f ${block_chain_node_path}/sdk.key ]]; then - rm -rf ${out_path}/conf/ca.crt ${out_path}/conf/sdk.crt ${out_path}/conf/sdk.key - cp ${block_chain_node_path}/ca.crt ${block_chain_node_path}/sdk.crt ${block_chain_node_path}/sdk.key ${out_path}/conf/ +if [[ -d ${block_chain_node_path} ]]; then + rm -rf ${out_path}/conf/conf + mkdir -p ${out_path}/conf/conf + cp -rf ${block_chain_node_path}/* ${out_path}/conf/conf else echo "ca.crt or sdk.crt or sdk.key is not exist." exit 1 fi +if [[ -f ${fisco_config_file} ]]; then + cp -rf ${fisco_config_file} ${out_path}/conf/ +else + echo "fisco_config_file is not exist." + exit 1 +fi + #deploy contract cd ${out_path} ./deploy-topic-control.sh @@ -66,6 +71,46 @@ else fi echo "set lister_port success" +if [[ ${database_type} != "h2" ]];then + switch_database_to_mysql "${application_properties}" + + if [[ -z ${mysql_ip} ]];then + echo "mysql_ip is empty." + echo "set mysql_ip failed" + exit 1 + else + sed -i "s/127.0.0.1:3306/${mysql_ip}:3306/" ${application_properties} + fi + echo "set mysql_ip success" + + if [[ -z ${mysql_port} ]];then + echo "mysql_port is empty." + echo "set mysql_port failed" + exit 1 + else + sed -i "s/3306/${mysql_port}/" ${application_properties} + fi + echo "set mysql_port success" + + if [[ -z ${mysql_user} ]];then + echo "mysql_user is empty." + echo "set mysql_user failed" + exit 1 + else + sed -i "s/xxxx/${mysql_user}/" ${application_properties} + fi + echo "set mysql_user success" + + if [[ -z ${mysql_pwd} ]];then + echo "mysql_pwd is empty" + echo "set mysql_pwd failed" + exit 1 + else + sed -i "s/yyyy/${mysql_pwd}/" ${application_properties} + fi + echo "set mysql_pwd success" +fi + if [[ -n ${zookeeper_connect_string} ]];then sed -i "/spring.cloud.zookeeper.connect-string=/cspring.cloud.zookeeper.connect-string=${zookeeper_connect_string}" ${out_path}/conf/application-prod.properties else @@ -74,4 +119,22 @@ else fi echo "set zookeeper_connect_string success" +# init db, create database and tables +cd ${out_path} +./init-broker.sh +if [[ $? -ne 0 ]];then + + echo "Error,init mysql fail" + exit 1 +fi +echo "init db success" + +function switch_database_to_mysql() { + mysql_config_line=$(cat -n $1|grep 'spring.jpa.database=mysql'|awk '{print $1}'|head -1) + sed -i ''$mysql_config_line','$((mysql_config_line+4))'s/^#//' $1 + + h2_config_line=$(cat -n $1|grep 'spring.jpa.database=h2'|awk '{print $1}'|head -1) + sed -i ''$h2_config_line','$((h2_config_line+4))'s/^/#/' $1 +} + echo "broker module install success" diff --git a/weevent-build/modules/governance/install-governance.sh b/weevent-build/modules/governance/install-governance.sh index b63b2f6b63071a5fda2072819139d4d603d47580..580ea8a019c639201792d1fccac733cd6ab94480 100644 --- a/weevent-build/modules/governance/install-governance.sh +++ b/weevent-build/modules/governance/install-governance.sh @@ -8,9 +8,9 @@ function governance_setup() { cp -r ./* ${out_path}/ install_top_path=$(dirname ${out_path}) - cp ${install_top_path}/broker/conf/ca.crt ${out_path}/conf/ - cp ${install_top_path}/broker/conf/sdk.* ${out_path}/conf/ - cp ${install_top_path}/broker/conf/fisco.properties ${out_path}/conf/ + rm -rf ${out_path}/conf/conf + cp -rf ${install_top_path}/broker/conf/conf ${out_path}/conf + cp -rf ${install_top_path}/broker/conf/fisco.yml ${out_path}/conf/ rm -f ${out_path}/install-governance.sh diff --git a/weevent-build/package.sh b/weevent-build/package.sh index 7df24e260cdd5245feaea921766dc13f61e90d25..bd84a847fbc7da5fef4271bce9e984b0a461c313 100644 --- a/weevent-build/package.sh +++ b/weevent-build/package.sh @@ -97,6 +97,7 @@ function copy_install_file(){ cd ${current_path} cp ${current_path}/config.properties ${current_path}/install-all.sh ${out_path} + cp ${current_path}/fisco.yml ${current_path}/install-all.sh ${out_path} cp -r ${current_path}/bin ${out_path} mkdir -p ${out_path}/modules/gateway diff --git a/weevent-core/src/main/java/com/webank/weevent/core/config/WeEventCoreConfig.java b/weevent-core/src/main/java/com/webank/weevent/core/config/WeEventCoreConfig.java index ab691b40aa7f3b2a4f3b1dc884f191c352891996..3281265fb76807a422a5835e75984cdda8a2d9a2 100644 --- a/weevent-core/src/main/java/com/webank/weevent/core/config/WeEventCoreConfig.java +++ b/weevent-core/src/main/java/com/webank/weevent/core/config/WeEventCoreConfig.java @@ -27,8 +27,6 @@ public class WeEventCoreConfig { private Integer keepAliveSeconds; - private String web3sdkEncryptType; - private Integer consumerIdleTime; private Integer consumerHistoryMergeBlock; diff --git a/weevent-core/src/main/java/com/webank/weevent/core/fisco/util/Web3sdkUtils.java b/weevent-core/src/main/java/com/webank/weevent/core/fisco/util/Web3sdkUtils.java index 99efc184991507b686406beae0e43ffcea0d11eb..26fd6c594c2faadffb32d5591b466e52faf1ee96 100644 --- a/weevent-core/src/main/java/com/webank/weevent/core/fisco/util/Web3sdkUtils.java +++ b/weevent-core/src/main/java/com/webank/weevent/core/fisco/util/Web3sdkUtils.java @@ -61,7 +61,7 @@ public class Web3sdkUtils { taskExecutor.initialize(); if (StringUtils.isBlank(fiscoConfig.getWeEventCoreConfig().getVersion())) { - log.error("empty FISCO-BCOS version in fisco.properties"); + log.error("empty FISCO-BCOS version in fisco.yml"); systemExit(1); } diff --git a/weevent-core/src/main/java/com/webank/weevent/core/fisco/web3sdk/FiscoBcosDelegate.java b/weevent-core/src/main/java/com/webank/weevent/core/fisco/web3sdk/FiscoBcosDelegate.java index fad93c192dbda8cdab91a656ef62480246dd2612..d9ae442f7df65679c25f8388f5af1b3c0f459e8e 100644 --- a/weevent-core/src/main/java/com/webank/weevent/core/fisco/web3sdk/FiscoBcosDelegate.java +++ b/weevent-core/src/main/java/com/webank/weevent/core/fisco/web3sdk/FiscoBcosDelegate.java @@ -32,7 +32,7 @@ import org.apache.commons.lang3.StringUtils; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; /** - * Detect FISCO-BCOS version from configuration 'fisco.properties' and then proxy all the invoke to the target. + * Detect FISCO-BCOS version from configuration 'fisco.yml' and then proxy all the invoke to the target. * Parameter groupId in all interface: * default 1L in 2.x, meanings first group * There is 2 different caches for block data. One is local memory, another is redis. @@ -76,11 +76,11 @@ public class FiscoBcosDelegate { config.getWeEventCoreConfig().getKeepAliveSeconds()); if (StringUtils.isBlank(config.getWeEventCoreConfig().getVersion())) { - log.error("the fisco version in fisco.properties is empty"); + log.error("the fisco version in fisco.yml is empty"); throw new BrokerException(ErrorCode.WEB3SDK_INIT_ERROR); } if (0 == config.getConfigProperty().getNetwork().size()) { - log.error("the fisco nodes in fisco.properties is null"); + log.error("the fisco nodes in fisco.yml is null"); throw new BrokerException(ErrorCode.WEB3SDK_INIT_ERROR); } diff --git a/weevent-core/src/main/resources/fisco.yml b/weevent-core/src/main/resources/fisco.yml index 2ce28b37c3d8c174a227867cb3d160e8a8bdd506..f841d678d9c5f04c7e35546f9ef9b9efd1d37606 100644 --- a/weevent-core/src/main/resources/fisco.yml +++ b/weevent-core/src/main/resources/fisco.yml @@ -7,8 +7,6 @@ timeout: 10000 poolSize: 10 maxPoolSize: 200 keepAliveSeconds: 10 -web3sdkEncryptType: ECDSA_TYPE -#web3sdkEncryptType: SM_TYPE consumerHistoryMergeBlock: 8 consumerIdleTime: 1000 diff --git a/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoBcosBroker4ProducerTest.java b/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoBcosBroker4ProducerTest.java index 67864776fc6c75be99323a027216f8f6a6d85778..01fb0a34c90b94c4269a920fe7942b9c7e6ed39a 100644 --- a/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoBcosBroker4ProducerTest.java +++ b/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoBcosBroker4ProducerTest.java @@ -40,6 +40,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.fisco.bcos.sdk.model.CryptoType.ECDSA_TYPE; + /** * FiscoBcosBroker4Producer Tester. * @@ -57,6 +59,7 @@ public class FiscoBcosBroker4ProducerTest extends JUnitTestBase { private FiscoConfig fiscoConfig; private FiscoBcosDelegate fiscoBcosDelegate; private long transactionTimeout = 30000; + private int cryptoType = ECDSA_TYPE; @Before public void before() throws Exception { @@ -456,7 +459,7 @@ public class FiscoBcosBroker4ProducerTest extends JUnitTestBase { } private CryptoKeyPair getExternalAccountCryptoKeyPair() { - if (fiscoConfig.getWeEventCoreConfig().getWeb3sdkEncryptType().equals("ECDSA_TYPE")) { + if (cryptoType == ECDSA_TYPE){ return (new ECDSAKeyPair()).generateKeyPair(); } else { return (new SM2KeyPair()).generateKeyPair(); diff --git a/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoTopicAdminTest.java b/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoTopicAdminTest.java index 4dde6b4cff4e38fe6e2cc32e4dc031682265d666..3d094b5b3d597b3a13f2c16a732c29c82bf915b3 100644 --- a/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoTopicAdminTest.java +++ b/weevent-core/src/test/java/com/webank/weevent/core/fisco/FiscoTopicAdminTest.java @@ -30,6 +30,8 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.fisco.bcos.sdk.model.CryptoType.ECDSA_TYPE; + /** * FiscoBcosTopicAdmin Tester. * @@ -50,6 +52,8 @@ public class FiscoTopicAdminTest extends JUnitTestBase { private final long transactionTimeout = 10; private FiscoConfig fiscoConfig; + private int cryptoType = ECDSA_TYPE; + @Before public void before() throws Exception { log.info("=============================={}.{}==============================", @@ -1103,7 +1107,7 @@ public class FiscoTopicAdminTest extends JUnitTestBase { } private CryptoKeyPair getExternalAccountCryptoKeyPair() { - if (fiscoConfig.getWeEventCoreConfig().getWeb3sdkEncryptType().equals("ECDSA_TYPE")) { + if ( cryptoType == ECDSA_TYPE) { return (new ECDSAKeyPair()).generateKeyPair(); } else { return (new SM2KeyPair()).generateKeyPair(); diff --git a/weevent-file/src/main/java/com/webank/weevent/file/inner/AMOPChannel.java b/weevent-file/src/main/java/com/webank/weevent/file/inner/AMOPChannel.java index 69ab998d303fce96792cb8b6824bff3c6b0a22bb..43048162d02cc6a832fb798d2e42984bbbd47d6a 100644 --- a/weevent-file/src/main/java/com/webank/weevent/file/inner/AMOPChannel.java +++ b/weevent-file/src/main/java/com/webank/weevent/file/inner/AMOPChannel.java @@ -35,6 +35,7 @@ import org.fisco.bcos.sdk.amop.AmopMsgOut; import org.fisco.bcos.sdk.amop.AmopResponse; import org.fisco.bcos.sdk.amop.topic.AmopMsgIn; import org.fisco.bcos.sdk.amop.topic.TopicType; +import org.fisco.bcos.sdk.client.protocol.response.NodeInfo; import org.fisco.bcos.sdk.client.protocol.response.Peers; import org.fisco.bcos.sdk.client.protocol.response.Peers.PeerInfo; import org.fisco.bcos.sdk.crypto.keystore.KeyTool; @@ -115,15 +116,43 @@ public class AMOPChannel extends AmopCallback { public Set getSubscribers(String topic) { Integer groupId = Integer.parseInt(WeEvent.DEFAULT_GROUP_ID); - Set subscribers = new HashSet<>(); - Peers peers = this.bcosSDK.getClient(groupId).getPeers(); - log.info("peers:{}", peers.getPeers()); - for (Peers.PeerInfo peer : peers.getPeers()){ - if(peer.getTopic().contains(topic)){ - subscribers.add(peer.getIpAndPort()); + Set subscriberIPs = new HashSet<>(); + Set subscriberNodeIds = new HashSet<>(); + + try { + List peers = this.bcosSDK.getClient(groupId).getPeers().getPeers(); + log.info("getSubscribers: peers {}, {}", topic, peers); + for (Peers.PeerInfo peer : peers){ + if(peer.getTopic().contains(topic)){ + subscriberIPs.add(peer.getIpAndPort()); + subscriberNodeIds.add(peer.getNodeID()); + log.info("subscribers peer {}, {}", topic, peer.getNodeID(), peer.getIpAndPort()); + } + } + } catch (Exception e) { + log.error("getPeers error:{} {}", topic, e.toString()); + } + + + List connectedNodes = this.bcosSDK.getConfig().getNetworkConfig().getPeers(); + log.info("getSubscribers: nodes {}", connectedNodes); + for (String nodeIp : connectedNodes){ + NodeInfo.NodeInformation nodeInfo = null; + try { + nodeInfo = this.bcosSDK.getClient(groupId).getNodeInfo(nodeIp).getNodeInfo(); + } catch (Exception e) { + log.error("getNodeInfo error:{} {} {}", topic, nodeIp, e.toString()); + continue; + } + log.info("getSubscribers: node {}, {}, {}", topic, nodeIp, nodeInfo); + if(nodeInfo.getTopic().contains(topic)){ + subscriberIPs.add(nodeInfo.getIpAndPort()); + subscriberNodeIds.add(nodeInfo.getNodeID()); + log.info("subscribers node {}, {}", topic, nodeInfo.getNodeID(), nodeInfo.getIpAndPort()); } } - return subscribers; + log.info("getSubscribers:{} {}", subscriberIPs, subscriberNodeIds); + return subscriberNodeIds; } // Receiver call subscribe topic diff --git a/weevent-file/src/main/resources/fisco.yml b/weevent-file/src/main/resources/fisco.yml index d2c2ee6f8646446f287d2c7c9b60b082a9733ee6..8a3d59d65d7a20b1fa5b3c23b325e105496684b1 100644 --- a/weevent-file/src/main/resources/fisco.yml +++ b/weevent-file/src/main/resources/fisco.yml @@ -7,7 +7,6 @@ timeout: 10000 poolSize: 10 maxPoolSize: 200 keepAliveSeconds: 10 -web3sdkEncryptType: ECDSA_TYPE consumerHistoryMergeBlock: 8 consumerIdleTime: 1000 diff --git a/weevent-file/src/test/java/com/webank/weevent/file/WeEventFileClientTest.java b/weevent-file/src/test/java/com/webank/weevent/file/WeEventFileClientTest.java index db199434c69ea27de8025f88ca98f3019111c7ea..2b3913f877860d93a92766457dad250a81d776b9 100644 --- a/weevent-file/src/test/java/com/webank/weevent/file/WeEventFileClientTest.java +++ b/weevent-file/src/test/java/com/webank/weevent/file/WeEventFileClientTest.java @@ -3,6 +3,7 @@ package com.webank.weevent.file; import java.io.File; import java.util.ArrayList; import java.util.List; +import java.util.Set; import com.webank.weevent.client.BrokerException; import com.webank.weevent.client.SendResult; @@ -355,7 +356,6 @@ public class WeEventFileClientTest { @Test public void testGetPemFile() throws BrokerException { -// WeEventFileClient.genPemFile(fiscoConfig.getWeEventCoreConfig().getWeb3sdkEncryptType()); WeEventFileClient.genPemFile("SM_TYPE"); Assert.assertTrue(true); } @@ -368,4 +368,13 @@ public class WeEventFileClientTest { boolean ret = weEventFileClient.isFileExist("conf/ca.crt", this.topicName, this.groupId); Assert.assertTrue(ret); } + + @Test + public void testGetSubscribers() throws BrokerException { + WeEventFileClient weEventFileClient = new WeEventFileClient(this.groupId, this.localReceivePath, this.fileChunkSize, this.fiscoConfig); + Set subscribers = weEventFileClient.getSubscribers(this.topicName); + System.out.println("subscribers:" + subscribers); + Assert.assertTrue(true); + } + } diff --git a/weevent-governance/src/main/java/com/webank/weevent/governance/GovernanceApplication.java b/weevent-governance/src/main/java/com/webank/weevent/governance/GovernanceApplication.java index 5e638719e1ec78fce11942f31e39abf25f8a6867..bdb68baf8b2381807fc6e7e6f92ed6eaeb67e550 100644 --- a/weevent-governance/src/main/java/com/webank/weevent/governance/GovernanceApplication.java +++ b/weevent-governance/src/main/java/com/webank/weevent/governance/GovernanceApplication.java @@ -5,6 +5,7 @@ import java.net.UnknownHostException; import javax.net.ssl.SSLException; +import com.webank.weevent.core.config.FiscoConfig; import com.webank.weevent.governance.common.GovernanceConfig; import lombok.extern.slf4j.Slf4j; @@ -64,6 +65,13 @@ public class GovernanceApplication { governanceConfig = config; } + @Bean + public FiscoConfig fiscoConfig(){ + FiscoConfig fiscoConfig = new FiscoConfig(); + fiscoConfig.load(""); + return fiscoConfig; + } + @Bean public ServletRegistrationBean weeventGovernanceServletBean(WebApplicationContext wac) { DispatcherServlet ds = new DispatcherServlet(wac); diff --git a/weevent-governance/src/main/java/com/webank/weevent/governance/initial/InitialDb.java b/weevent-governance/src/main/java/com/webank/weevent/governance/initial/InitialDb.java index f5db466d5433a4d424caa1a2abc21b827b6ff4c6..11d01c715ff4f52e34a48b634de73ab9707bd890 100644 --- a/weevent-governance/src/main/java/com/webank/weevent/governance/initial/InitialDb.java +++ b/weevent-governance/src/main/java/com/webank/weevent/governance/initial/InitialDb.java @@ -47,13 +47,15 @@ public class InitialDb implements AutoCloseable { if (flag) { databaseType = "mysql"; } - // first use dbself database - int first = goalUrl.lastIndexOf("/"); - int end = goalUrl.lastIndexOf("?"); - this.dbName = flag ? goalUrl.substring(first + 1, end) : goalUrl.substring(first + 1); + int first = goalUrl.lastIndexOf("/") + 1; + int endTag = goalUrl.indexOf("?"); + if (endTag == -1) { + endTag = goalUrl.length(); + } + this.dbName = goalUrl.substring(first, endTag); // get mysql default url like jdbc:mysql://127.0.0.1:3306 String defaultUrl = flag ? goalUrl.substring(0, first) : goalUrl; - + log.info("dbName:{},defaultUrl:{}, {}", this.dbName, defaultUrl, databaseType); Class.forName(driverName); List tableSqlList = readSql(); diff --git a/weevent-governance/src/main/java/com/webank/weevent/governance/service/TopicService.java b/weevent-governance/src/main/java/com/webank/weevent/governance/service/TopicService.java index ea9cf343489ed4f053b548d53f1dc086e4506b5c..f8daf036a0bd7fcdf041ea5b167baa20ff1f8b2a 100644 --- a/weevent-governance/src/main/java/com/webank/weevent/governance/service/TopicService.java +++ b/weevent-governance/src/main/java/com/webank/weevent/governance/service/TopicService.java @@ -5,8 +5,10 @@ import java.net.URLEncoder; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -25,6 +27,7 @@ import com.webank.weevent.governance.entity.TopicPage; import com.webank.weevent.governance.entity.TopicPageEntity; import com.webank.weevent.governance.enums.IsDeleteEnum; import com.webank.weevent.governance.repository.TopicRepository; +import com.webank.weevent.governance.utils.Utils; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; @@ -59,6 +62,9 @@ public class TopicService { @Autowired private CommonService commonService; + @Autowired + private FiscoConfig fiscoConfig; + private final String SPLIT = "-"; public Boolean close(Integer brokerId, String topic, String groupId, HttpServletRequest request, HttpServletResponse response) @@ -97,7 +103,7 @@ public class TopicService { TopicPage result = new TopicPage(); result.setPageIndex(pageIndex); result.setPageSize(pageSize); - result.setNodeAddress(governanceConfig.getNodeAddressList()); + result.setNodeAddress(this.getNodeAddressList()); if (brokerEntity == null) { return result; } @@ -116,7 +122,7 @@ public class TopicService { TopicPage topicPage = invokeBrokerCGI(request, url, new TypeReference>() { }).getData(); - topicPage.setNodeAddress(governanceConfig.getNodeAddressList()); + topicPage.setNodeAddress(this.getNodeAddressList()); if (topicPage == null || CollectionUtils.isEmpty(topicPage.getTopicInfoList())) { return result; } @@ -265,4 +271,13 @@ public class TopicService { } } + private List getNodeAddressList() { + Set nodeAddressList = new HashSet<>(); + nodeAddressList.add(Utils.list2String(fiscoConfig.getFiscoNodes())); + if (governanceConfig.getNodeAddressList() != null) { + nodeAddressList.addAll(governanceConfig.getNodeAddressList()); + } + + return new ArrayList<>(nodeAddressList); + } } diff --git a/weevent-governance/src/main/resources/application-dev.properties b/weevent-governance/src/main/resources/application-dev.properties index 55c7243432ebe7ed2ad9597d3992988f444a1abd..f14a4de8e6e16d7a6409117ba32ffe3997d29bba 100644 --- a/weevent-governance/src/main/resources/application-dev.properties +++ b/weevent-governance/src/main/resources/application-dev.properties @@ -1,6 +1,14 @@ spring.application.name=weevent-governance server.port=7009 logging.config=classpath:log4j2.xml +#mysql database config +#spring.jpa.database=mysql +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_governance?useUnicode=true&characterEncoding=utf-8&useSSL=false +#spring.datasource.driver-class-name=org.mariadb.jdbc.Driver +#spring.datasource.username=xxxx +#spring.datasource.password=yyyy +#h2 database config +spring.jpa.database=h2 spring.datasource.url=jdbc:h2:./WeEvent_governance spring.datasource.driver-class-name=org.h2.Driver spring.datasource.username=root diff --git a/weevent-governance/src/main/resources/application-prod.properties b/weevent-governance/src/main/resources/application-prod.properties index 17d2b813ef7c0a657a0234d2a6a258abe4586f5d..b50f84605c10430616e2bf6595f6ae5dfa04ef05 100644 --- a/weevent-governance/src/main/resources/application-prod.properties +++ b/weevent-governance/src/main/resources/application-prod.properties @@ -1,6 +1,14 @@ spring.application.name=weevent-governance server.port=7009 logging.config=classpath:log4j2.xml +#mysql database config +#spring.jpa.database=mysql +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_governance?useUnicode=true&characterEncoding=utf-8&useSSL=false +#spring.datasource.driver-class-name=org.mariadb.jdbc.Driver +#spring.datasource.username=xxxx +#spring.datasource.password=yyyy +#h2 database config +spring.jpa.database=h2 spring.datasource.url=jdbc:h2:./WeEvent_governance spring.datasource.driver-class-name=org.h2.Driver spring.datasource.username=root diff --git a/weevent-governance/src/main/resources/fisco.yml b/weevent-governance/src/main/resources/fisco.yml index f3f7c90476f8df79d4c29a7af76ab108d6a53391..5207732c2533569777fe193f24366a294a7a0570 100644 --- a/weevent-governance/src/main/resources/fisco.yml +++ b/weevent-governance/src/main/resources/fisco.yml @@ -7,7 +7,6 @@ timeout: 10000 poolSize: 10 maxPoolSize: 200 keepAliveSeconds: 10 -web3sdkEncryptType: ECDSA_TYPE consumerHistoryMergeBlock: 8 consumerIdleTime: 1000 diff --git a/weevent-governance/src/main/resources/governance.properties b/weevent-governance/src/main/resources/governance.properties index ed045f7e3196c386eccbbd02f0a717028e50318a..c420b449273edeb19d5fddb2c408758debd9682b 100644 --- a/weevent-governance/src/main/resources/governance.properties +++ b/weevent-governance/src/main/resources/governance.properties @@ -10,6 +10,6 @@ file.transport.path=./logs acount.name=admin acount.password=admin123456 -nodeAddressList[0]=127.0.0.1:20200 -nodeAddressList[1]=127.0.0.1:20201 -nodeAddressList[2]=127.0.0.1:20202,127.0.0.1:20203 \ No newline at end of file +#nodeAddressList[0]=127.0.0.1:20200 +#nodeAddressList[1]=127.0.0.1:20201 +#nodeAddressList[2]=127.0.0.1:20202,127.0.0.1:20203 \ No newline at end of file diff --git a/weevent-governance/src/main/resources/script/governance_h2.sql b/weevent-governance/src/main/resources/script/governance_h2.sql index a00e1455d1aff8733906b988c1ead12ceead8043..4b094b6b37ebbf6ec3ff57d1eaf9e147f23b305a 100644 --- a/weevent-governance/src/main/resources/script/governance_h2.sql +++ b/weevent-governance/src/main/resources/script/governance_h2.sql @@ -160,6 +160,6 @@ CREATE TABLE t_timer_scheduler ( alter table t_broker add CONSTRAINT brokerUrlDeleteAt UNIQUE (broker_url, delete_at) ; alter table t_rule_engine add CONSTRAINT ruleNameDeleteAt UNIQUE (rule_name, delete_at); alter table t_topic add CONSTRAINT topicNameBrokerGroupDelete UNIQUE (topic_name,broker_id,group_id,delete_at); -alter table t_file_transport_channel add CONSTRAINT topicBrokerGroupDelete UNIQUE (topic_name,broker_id,group_id); +alter table t_file_transport_channel add CONSTRAINT topicBrokerGroupDelete UNIQUE (topic_name,broker_id,group_id,node_address,role); alter table t_topic_historical add CONSTRAINT brokerIdGroupIdEventId UNIQUE (brokerId, groupId, eventId); alter table t_file_transport_status add CONSTRAINT topicBrokerGroupFileName UNIQUE (broker_id,group_id,topic_name,file_name); \ No newline at end of file diff --git a/weevent-governance/src/main/resources/script/governance_mysql.sql b/weevent-governance/src/main/resources/script/governance_mysql.sql index a031b4d44abaf1b88b17d7785fb2b66a4cc93530..b24efabf684b1fa224f2b1605e66d1b4c199a83c 100644 --- a/weevent-governance/src/main/resources/script/governance_mysql.sql +++ b/weevent-governance/src/main/resources/script/governance_mysql.sql @@ -158,6 +158,6 @@ CREATE TABLE t_timer_scheduler ( ALTER TABLE t_broker ADD CONSTRAINT brokerUrlDeleteAt UNIQUE (broker_url, delete_at) ; ALTER TABLE t_rule_engine ADD CONSTRAINT ruleNameDeleteAt UNIQUE (rule_name, delete_at); ALTER TABLE t_topic ADD CONSTRAINT topicNameBrokerGroupDelete UNIQUE (topic_name,broker_id,group_id,delete_at); -ALTER TABLE t_file_transport_channel ADD CONSTRAINT topicBrokerGroupDelete UNIQUE (topic_name,broker_id,group_id); +ALTER TABLE t_file_transport_channel ADD CONSTRAINT topicBrokerGroupDelete UNIQUE (topic_name,broker_id,group_id,node_address,role); ALTER TABLE t_topic_historical ADD CONSTRAINT brokerIdGroupIdEventId UNIQUE (brokerId, groupId, eventId); ALTER TABLE t_file_transport_status ADD CONSTRAINT topicBrokerGroupFileName UNIQUE (topic_name,broker_id,group_id,file_name); \ No newline at end of file diff --git a/weevent-governance/web/src/components/fileTranspoart.vue b/weevent-governance/web/src/components/fileTranspoart.vue index b79aab8dae401a8391c1c1131770fcc7dc22b310..cd168ee524b704d95d6d053fb836b64f257c755e 100644 --- a/weevent-governance/web/src/components/fileTranspoart.vue +++ b/weevent-governance/web/src/components/fileTranspoart.vue @@ -490,11 +490,11 @@ export default { if (e.role === '0') { vm.downLoadList[name] = setInterval(fun => { vm.downStatus(e) - }, 1000) + }, 2000) } else { vm.downLoadList[name] = setInterval(fun => { vm.upStatus(e) - }, 1000) + }, 2000) } } }, diff --git a/weevent-processor/src/main/java/com/webank/weevent/processor/db/InitialDb.java b/weevent-processor/src/main/java/com/webank/weevent/processor/db/InitialDb.java index bd4cda5265b55e8d90d088baf10cecd8c12a679d..3add6a21f55896b4e18a3d4580f5fd9f5dc9411a 100644 --- a/weevent-processor/src/main/java/com/webank/weevent/processor/db/InitialDb.java +++ b/weevent-processor/src/main/java/com/webank/weevent/processor/db/InitialDb.java @@ -40,29 +40,28 @@ public class InitialDb implements AutoCloseable { } private void createDataBase() throws Exception { - try { - String goalUrl = properties.getProperty("spring.datasource.url"); - this.user = properties.getProperty("spring.datasource.username"); - this.password = properties.getProperty("spring.datasource.password"); - String driverName = properties.getProperty("spring.datasource.driverClassName"); - boolean flag = driverName.contains("mariadb"); - if (flag) { - databaseType = "mysql"; - } - int first = goalUrl.lastIndexOf("/") + 1; - this.dbName = goalUrl.substring(first); - // get mysql default url like jdbc:mysql://127.0.0.1:3306 - String defaultUrl = flag ? goalUrl.substring(0, first) : goalUrl; - Class.forName(driverName); - - List tableSqlList = readCEPSql(); - - runScript(defaultUrl, flag, tableSqlList); - } catch (Exception e) { - log.error("create database error,{}", e.getMessage()); - throw e; + String goalUrl = properties.getProperty("spring.datasource.url"); + this.user = properties.getProperty("spring.datasource.username"); + this.password = properties.getProperty("spring.datasource.password"); + String driverName = properties.getProperty("spring.datasource.driver-class-name"); + boolean flag = driverName.contains("mariadb"); + if (flag) { + databaseType = "mysql"; } + int first = goalUrl.lastIndexOf("/") + 1; + int endTag = goalUrl.indexOf("?"); + if (endTag == -1) { + endTag = goalUrl.length(); + } + this.dbName = goalUrl.substring(first, endTag); + // get mysql default url like jdbc:mysql://127.0.0.1:3306 + String defaultUrl = flag ? goalUrl.substring(0, first) : goalUrl; + log.info("dbName:{},defaultUrl:{}, {}", this.dbName, defaultUrl, databaseType); + Class.forName(driverName); + + List tableSqlList = readSql(); + runScript(defaultUrl, flag, tableSqlList); } private Properties getProperties() throws Exception { @@ -74,7 +73,7 @@ public class InitialDb implements AutoCloseable { } - private static List readCEPSql() throws IOException { + private static List readSql() throws IOException { InputStream resourceAsStream = InitialDb.class.getResourceAsStream("/cep_rule_" + databaseType + ".sql"); StringBuffer sqlBuffer = new StringBuffer(); List sqlList = new ArrayList<>(); diff --git a/weevent-processor/src/main/resources/application-dev.properties b/weevent-processor/src/main/resources/application-dev.properties index a9f4e16034e27d50e5efdfb19556815c73b0e75c..18e162963bf7103daf7f3872d4a6048fb06808ec 100644 --- a/weevent-processor/src/main/resources/application-dev.properties +++ b/weevent-processor/src/main/resources/application-dev.properties @@ -12,8 +12,17 @@ spring.http.encoding.force=true spring.messages.encoding=UTF-8 spring.pid.fail-on-write-error=true spring.pid.file=./logs/${spring.application.name}.pid + +#mysql database config +#spring.jpa.database=mysql +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_processor?useUnicode=true&characterEncoding=utf-8&useSSL=false +#spring.datasource.driver-class-name=org.mariadb.jdbc.Driver +#spring.datasource.username=xxxx +#spring.datasource.password=yyyy +#h2 database config +spring.jpa.database=h2 spring.datasource.url=jdbc:h2:./WeEvent_processor -spring.datasource.driverClassName=org.h2.Driver +spring.datasource.driver-class-name=org.h2.Driver spring.datasource.username=root spring.datasource.password=123456 spring.datasource.type=org.apache.commons.dbcp2.BasicDataSource diff --git a/weevent-processor/src/main/resources/application-prod.properties b/weevent-processor/src/main/resources/application-prod.properties index a9f4e16034e27d50e5efdfb19556815c73b0e75c..18e162963bf7103daf7f3872d4a6048fb06808ec 100644 --- a/weevent-processor/src/main/resources/application-prod.properties +++ b/weevent-processor/src/main/resources/application-prod.properties @@ -12,8 +12,17 @@ spring.http.encoding.force=true spring.messages.encoding=UTF-8 spring.pid.fail-on-write-error=true spring.pid.file=./logs/${spring.application.name}.pid + +#mysql database config +#spring.jpa.database=mysql +#spring.datasource.url=jdbc:mysql://127.0.0.1:3306/WeEvent_processor?useUnicode=true&characterEncoding=utf-8&useSSL=false +#spring.datasource.driver-class-name=org.mariadb.jdbc.Driver +#spring.datasource.username=xxxx +#spring.datasource.password=yyyy +#h2 database config +spring.jpa.database=h2 spring.datasource.url=jdbc:h2:./WeEvent_processor -spring.datasource.driverClassName=org.h2.Driver +spring.datasource.driver-class-name=org.h2.Driver spring.datasource.username=root spring.datasource.password=123456 spring.datasource.type=org.apache.commons.dbcp2.BasicDataSource