Fetch the repository succeeded.
This action will force synchronization from supermy/kafka-spark-redis, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
时间片段数量统计 时间片段筛选
Redis 最大存储量的限制
1. 需要 jdk8环境;
2. 配置 kafka 环境;
3. 运行示例1
4. 运行示例2
brew install kafka redis
1.先启动zookeeper服务,端口确保2181.
2.kafka-server-start.sh /usr/local/etc//kafka/server.properties
3.创建一个主题
创建,
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
获取,
kafka-topics.sh --list --zookeeper localhost:2181
4.发送一些消息: 在控制台输入消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test
5.启动一个消费者:观察在数据台显示消息
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Topic的分区和复制
1. 创建debugo01,这个topic分区数为3,复制为1(不复制)。该topic跨越全部broker。下面管理命令在任意kafka节点上执行即可
kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 1 --partitions 3 --topic debugo01
kafka-topics.sh --create --zookeeper localhost --replication-factor 1 --partitions 3 --topic debugo01
2. 创建debugo02,这个topic分区数为1,复制为3(每个主机都有一份)。该topic跨越全部broker。下面管理命令在任意kafka节点上执行即可
需要三台主机
kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 3 --partitions 1 --topic debugo02
3. 列出topic信息
kafka-topics.sh --list --zookeeper localhost:2181
4. 列出topic描述信息
kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo01
5. 检查log目录,对于topic debugo01,debugo01为0号分区,debugo02为1号分区。而topic debugo02则复制了3份,都为0号分区
6. 下面topic debugo03,replication-factor为2,partition为3.那么broker id为1的debugo01会如下面describe所示,保存0号分区和1号分区。
而0号分区的repica leader为broker id = 3,包含3和1两个replicas。
kafka-topics.sh --create --zookeeper debugo01,debugo02,debugo03 --replication-factor 2 --partitions 3 --topic debugo03
kafka-topics.sh --describe --zookeeper localhost:2181 --topic debugo03
ll /var/kafka/debugo03*
消息的产生和消费
kafka-console-producer.sh --broker-list debugo01:9092 --topic debugo03
kafka-console-consumer.sh --zookeeper debugo01:2181 --from-beginning --topic debugo03
7.性能测试
4线程
生产数据:50w/12sec 4w/sec
消费数据:150/27sec 5w/sec
下面使用perf命令来测试几个topic的性能,需要先下载kafka-perf_2.10-0.8.1.1.jar,并拷贝到kafka/libs下面。
50W条消息,每条1000字节,batch大小1000,topic为debugo01,4个线程(message size设置太大需要调整相关参数,否则容易OOM)。只用了13秒完成,kafka在多分区支持下吞吐量是非常给力的。
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo01 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
同样的参数测试debugo02, 由于但分区加复制(replicas-factor=3),用时39秒。所以,适当加大partition数量和broker相关线程数量会极大的提高性能。
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo02 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
同样的参数测试debugo03,用时30秒。
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo03 --threads 4 --broker-list debugo01:9092,debugo02:9092,debugo03:9092
kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo01 --threads 3
kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo02 --threads 3
kafka-consumer-perf-test.sh --zookeeper debugo01,debugo02,debugo03 --messages 500000 --topic debugo03 --threads 3
单机测试
kafka-producer-perf-test.sh --messages 500000 --message-size 1000 --batch-size 1000 --topics debugo01 --threads 4 --broker-list localhost:9092
kafka-consumer-perf-test.sh --zookeeper localhost --messages 500000 --topic debugo01 --threads 3
mvn scala:compile
spark-streaming-kafka 包:支持向 kafka 传送数据
//topic 与 随机数 构造消息
val message = new ProducerRecord[String, String](topic, null, str)
//发送消息到 kafka
producer.send(message)
运行时编辑configuration,
(1)KafkaWordCountProducer
选择KafkaWordCount.scala中的KafkaWordCountProducer方法
VM options 设置为:-Dspark.master=local
设置程序输入参数,Program arguments: localhost:9092 test 3 5
从 zookeeper 获取地址消费数据,此方式已被废弃
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) //创建流并且获取数据
生产数据
val producer = new Producer[String, String](kafkaConfig)
producer.send(new KeyedMessage[String, String](topic, event.toString))
消费数据,目前采用的方式快速且好用资源较少
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
1. source /etc/profile
2.
3. grapher=`ps -ef | grep spark |grep SparkStreaming.jar | awk '{print $2}'`
4. echo $grapher
5.
6. kill -9 $grapher
7.
8. nohup /opt/modules/spark/bin/spark-submit \
9. --master spark://127.0.0.1:7077 \
10. --driver-memory 3g \
11. --executor-memory 3g \
12. --total-executor-cores 24 \
13. --conf spark.ui.port=56689 \
14. --jars /opt/bin/sparkJars/kafka_2.10-0.8.2.1.jar,/opt/bin/sparkJars/spark-streaming-kafka_2.10-1.4.1.jar,/opt/bin/sparkJars/metrics-core-2.2.0.jar,/opt/bin/sparkJars/mysql-connector-java-5.1.26-bin.jar,/opt/bin/sparkJars/spark-streaming-
15. kafka_2.10-1.4.1.jar \
16. --class com.hexun.streaming.StockCntSumKafkaLPcnt \
17. /opt/bin/UDF/SparkStreaming.jar \
18. >/opt/bin/initservice/stock.log 2>&1 & \
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。