当要将dataframe进行序列化(df.show()或者df.collect())时,报这个错误。
原因是:maven的pom.xml中含有spark 和 kafka。
1.spark2.3用到了lz4-1.3.0.jar,kafka0.9.0.1用到了lz4-1.2.0.jar,而程序运行时使用的是lz4-1.3.0.jar。
2.lz4-1.3.0.jar包中net.jpountz.util.Utils 类中没有checkRange,该方法位于net.jpountz.util.SafeUtils和net.jpountz.util.UnsafeUtils
原文:https://blog.csdn.net/m0_37914799/article/details/84992275
可以通过:mvn dependency:tree|less查看冲突的包。
解决办法:
通过修改pom.xml
中·kafka-client·依赖,exclude掉lz4的依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<exclusions>
<exclusion>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
</exclusion>
</exclusions>
</dependency>
原因:这个错误是Hadoop版本的fasterxml版本较低,spark的较高,
解决方法:在maven的pom.xml文件中,记住一定要在 中加入以下库。
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala\_2.</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1..Final</version>
</dependency>
IDEA中JDK版本和Scala不匹配,我的JDK默认为IDEA自带的JDK12,所以我换成了JDK1.8,就可以了
还是版本问题:原来我用的是2.2.1,然后写Scalatest时会报这个错误
import com.google.common.io.Files
import java.io.File
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.test.SharedSparkSession
import org.scalatest.FunSuite
class UrlDmpDataProcessJobTest extends FunSuite with SharedSparkSession {
val testDir = new File(".").getCanonicalPath + "/test/url"
var output: File = _
override def beforeAll() {
super.beforeAll()
output = Files.createTempDir()
}
override def afterAll() {
super.afterAll()
FileUtils.deleteDirectory(output)
}
test("json parse") {
import testImplicits._
val jsonFile = testDir + "/dmp-json-data.txt"
val jsonData = spark.read.json(jsonFile).toDF()
val jsonDf = jsonData.as[UrlLoadData].filter(_ != null)
jsonDf.show()
val outputDf = UrlDmpDataParse.getJsonParseData(spark, jsonDf)
outputDf.show()
val urls = outputDf.select("urls").collect()
val pid = outputDf.select("pid").collect()
val uid = outputDf.select("uid").collect()
assert(pid === Array(Row(""),Row("")))
assert(uid === Array(Row("1bd55eb51c80733a994d853b98f46ce1"),Row("f34c9dc9f0b53cdd0972d50445759d45")))
}
}
解决办法:
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest\_${scala.binary.version}</artifactId>
<version>3.0.</version>
<scope>test</scope>
</dependency>
手机扫一扫
移动阅读更方便
你可能感兴趣的文章