本文共 1435 字,大约阅读时间需要 4 分钟。
启动Hadoop和Spark是数据处理的基础,以下步骤将帮助您顺利完成操作。
在终端中输入以下命令启动Spark:
spark-shell --master local[2] --jars /usr/local/src/spark-1.6.1-bin-hadoop2.6/libext/com.mysql.jdbc.Driver.jar
这一步需要确保Spark及其依赖已经正确安装,特别是若链接到MySQL数据库,必须添加对应的JAR。
将Spark目录下的日志文件读取进来进行测试:
val alllog=sc.textFile("file:///usr/local/src/spark-1.6.1-bin-hadoop2.6/logs/*out*") 验证记录数量:
alllog.count
注意:记得检查所选日志目录路径是否正确。
将读取到的RDD格式数据转换为DataFrame:
import org.apache.spark.sql.Rowval alllogRDD = alllog.map(x => Row(x))import org.apache.spark.sql.types._val schemaString = "line"val schema = StructType( schemaString.split(" ").map(fieldName => StructField(fieldName, StringType(), true)))val alllogDataFrame = sqlContext.createDataFrame(alllogRDD, schema) 注册表并打印Schema:
alllogDataFrame.registerTempTable("log")alllogDataFrame.printSchema 显示DataFrame内容:
alllogDataFrame.show(false)
将DataFrame转换为临时表后,便可以使用SQL查询:
sqlContext.sql("SELECT * FROM log").show() 此时可以对表进行增删改查操作,方便数据处理。
读取特定文件夹下的JSON文件:
val df = sqlContext.read.format("json").load("file:///mnt/hgfs/vm/china.json")df.printSchema 保存结果:
df.select("*").write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/china.parquet") 对于包含嵌套数组的JSON文件,可以使用SQL的explode函数展开数据:
val exploded_df = sqlContext.sql("SELECT explode(array_column, ',') as column, value FROM parquet.`examples/src/main/resources/users.parquet`")exploded_df.show(false) 转载地址:http://osmjz.baihongyu.com/