KafkaConsumer is not safe for multi-threaded access-程序员宅基地

技术标签: error__problem  spark  kafka  structed streaming  multi-threaded  

1、关于structed streaming 读取kafka的问题:

数据流从kafka过来,根据条件的不同,会被拆分成好几个流式dataFrame,这些流式的DF会进行各种join操作,在这个过程中会随机的触发KafkaConsumer is not safe for multi-threaded access,kafka多线程消费的问题,本程序就只有一个线程,怎么会有多线程消费的问题。在网上各方查找,有说是spark的架构task任务问题,有说是加个cache进行RDD缓存,有说是spark的bug,最终还是没有解决,问题还是会随机的出现,并且随着流的拆分数量增多,出现的概率也越高。(spark版本是2.3.0)

java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:305)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:216)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:122)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:106)
	at org.apache.spark.util.UninterruptibleThread.runUninterruptibly(UninterruptibleThread.scala:77)
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
	at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431)
	at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1132)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:305)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer.org$apache$spark$sql$kafka010$CachedKafkaConsumer$$fetchData(CachedKafkaConsumer.scala:216)
	at org.apache.spark.sql.kafka010.CachedKafkaConsumer$$anonfun$get$1.apply(CachedKafkaConsumer.scala:122)

2、解决办法:

由于问题是随机出现的,说明并不是代码的问题,可能是环境的问题或者其他外在的问题。目前找到唯一有效的办法就是提高spark的版本。将2.3.0改成了2.3.1。也不是最新版本,也不算老的版本,可能就是最好的版本。

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>2.3.1</version>
        </dependency>

 

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_40126236/article/details/89243918

智能推荐

5G时代探索互动立体视频信息承载的新可能-程序员宅基地

文章浏览阅读908次。5G时代对于视频行业的发展和业务形态将是一个重要的助推,但5G时代带来的改变不只是带宽提升和延迟降低这两个最直接的因素。本次LiveVideoStackCon 2020线上峰会我们邀请到..._探索从平面到三维视频信息表达的新可能

combiner优化_合路器 combiner csdn-程序员宅基地

文章浏览阅读484次。combiner为何物1. Combiner是MR程序中Mapper和Reduce之外的一种组件2. Combiner组件的父类是Reducer3. Combiner和Reducer之间的区别在于运行的位置,Combiner可以看做局部的Reducer(local reducer)4. Reducer接收全局的MapTask 所输出的结果5. Combiner在MapTask节点中运行..._合路器 combiner csdn

Cloudera Manager 5.15.2离线安装笔记(一)_cdh-5.15.2-1.cdh5.15.2.p0.3-el7.parcel-程序员宅基地

文章浏览阅读1k次。Cloudera Manager 5.15.2离线安装笔记(一)工欲善其事必先利其器,想要学好一门技术首先得有趁手的工具,要想学好大数据技术,还是得有比较好的工具才行。本笔记记录的是安装Cloudera Manager的过程。CDH的全称是Cloudera’s Distribution Including Apache Hadoop,是hadoop众多发行版本中的一种,是由Cloudera维护..._cdh-5.15.2-1.cdh5.15.2.p0.3-el7.parcel

新版Android Studio火烈鸟 在新建项目工程时 无法选java的语言模板解决方法_androidstudio没有java语言选项-程序员宅基地

文章浏览阅读2w次,点赞52次,收藏65次。最近下载最新版androidstudio时 发现不能勾选java语言模板了如果快速点击下一步 新建项目 默认是kotlin语言模板 这可能和google主推kt语言有关。_androidstudio没有java语言选项

如何用java开发一个网站?_java开发网站-程序员宅基地

文章浏览阅读1w次,点赞25次,收藏196次。问题:如何用java开发一个网站?下载了最新的JDK软件、最新的Eclipse、数据库mysql以及tomcat、struts但是不知道怎么连接起来,在数据库连接的时候mysql-connector-java-5.1.44中没有Driver.jar,tomcat配置环境的时候也有问题,tomcat plugin没有和最新的JDK配套的怎么办?看了问题,我建议题主还是好好先学一轮基础的东西。基于问题我简单提几点:Eclipse是开发工具,最新的没问题。JDK其实不需要用最新,现在市面上大多数还是JDK_java开发网站

HDU 3605 Escape(最大流+状态压缩)_acm3605题答案csdn-程序员宅基地

文章浏览阅读338次。题意:现有n个人要移居到m个星球去,给定一个n*m的矩阵,第 i 行第 j 列如果为1,表示第 i 个人可以去第 j 个星球,如果为0,表示不可以去。然后给出这m个星球都最多分别能住多少人,问你n个人是不是都能找到星球住? (1 思路:看到这个n的范围我震惊了...然后不知道怎么做了... 明显的最大流问题,不过n数目太大,直接做肯定超时. 留意到m最多有10个,所_acm3605题答案csdn

随便推点

PHPstudy出错_phpstudy 0002d806 cannot create shell notification-程序员宅基地

文章浏览阅读490次。Windows上安装PHPstudy后,会自动在iis开启php服务,而且自启动如果出错,最简单的方法就是重装,先要关掉PHPstudy占用的进程,将PHPstudy的安装目录选择到以前的安装目录,进行覆盖_phpstudy 0002d806 cannot create shell notification icon.

Vue.js devtools插件:超实用的浏览器扩展,使项目更容易地调试和优化-程序员宅基地

文章浏览阅读1.2k次,点赞11次,收藏12次。Vue.js devtools插件:超实用的浏览器扩展,使项目更容易地调试和优化_vue.js devtools

基于Matlab使用蒙特卡洛法对机器人运动空间的求解等_matlab运动学之蒙特卡罗法求积分与机器人工作域分析 秦迷天下 2023-12-08 19:-程序员宅基地

文章浏览阅读773次。基于Matlab使用蒙特卡洛法 几何解析法求解机器人运动空间_matlab运动学之蒙特卡罗法求积分与机器人工作域分析 秦迷天下 2023-12-08 19:

spark scala-transformation基础操作_spark scala transform-程序员宅基地

文章浏览阅读269次。本文章主要通过scala实现spark的transformation的常用功能:1 map算子2 flatMap算子3 sortByKey算子4 join算子5 filter算子import org.apache.spark.SparkConfimport org.apache.spark.SparkContext/** * @author jhp */object Transform..._spark scala transform

Harmony OS WiFi编程——连接热点、创建热点-程序员宅基地

文章浏览阅读938次,点赞25次,收藏19次。在Harmony OS上STA模式扫描其他WiFi热点,需要注意以下事项。可以将本仓整体拷贝到openharmony源码树下,和。接口只是触发扫描动作,并不会等到扫描完成才返回;扫描状态监听回调函数内,不能直接调用。拷贝到openharmony源码的。前两章中的方法——将当前目录下的。修改openharmony源码的。4.鸿蒙开发系统底层方向。1.项目开发必备面试题。5.鸿蒙音视频开发方向。扫描完成后要及时调用。

python not in string_每周一个 Python 模块 | string-程序员宅基地

文章浏览阅读50次。同时,也欢迎关注我的微信公众号 AlwaysBeta,更多精彩内容等你来。目的:包含用于处理文本的常量和类。string 模块可以追溯到最早的 Python 版本。先前在此模块中实现的许多功能已移至 str 对象方法。string 模块保留了几个有用的常量和类来处理 str 对象。函数 capwords()直接看下面的事例:import strings = 'The quick brown fox..._python not in string

推荐文章

热门文章

相关标签