当前位置:   article > 正文

大数据复习案例_从文件系统中加载数据创建 rdd头歌

从文件系统中加载数据创建 rdd头歌

happybase操作Hbase语法:

import happybase
## 链接HBase数据库
conn = happybase.Connection(host=’localhost’, port=9090, timeout=None, autoconnect=True, table_prefix=None, table_prefix_separator=b’_’, compat=’0.98’, transport=’buffered’, protocol=’binary’)
## 创建表
conn.create_table(
	'shop',
	{
		'interfaceInfo' :dict(max_version=4),
		'inputInfo' :dict(max_version = 4)
	}
)
#插入
table = conn.table("shop")
with table.batch() as bat:
	bat.put('0001',{'interfaceInfo:inter_show':'HDM1', 'interfaseInfo:inter_network':'10Mbps', 'interfaceInfo:inter_three':'1个','interfaceInfo:inter_Type-c':'1个'})
	bat.put('0001',{'inputInfo:input_one':'有指点杆','inputInfo:input_tow':'全尺寸键盘','inputInfo:input_three':'多点触控','inputInfo:input_four':'多点触控'})

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

MapReduce语法(变压器案例为例)

mapper.py
#! /usr/bin/python3
# mapper处理之后的结果会自动按照key排序
import sys

def mapper(line):
    key = float(line.split(",")[2])

    cat = ''        
    if key <= 630.00:
        cat = "mini"
    elif key <= 6300:
        cat = "mid"
    else:
        cat = "over"
    print("%s\t%s" % (cat, 1))



def main():
    for line in sys.stdin:
        line = line.strip()
        if line.startswith('child'):
        	break
        else:
            mapper(line)

if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
reduce.py
#! /usr/bin/python3

import sys

def reducer(k, values):
    print("%s:\t:%s" % (k, sum(values)))

def main():
    current_key = None
    values = []
    akey, avalue = None, None

    for line in sys.stdin:
        line = line.strip()
      
        try:
            akey, avalue = line.split('\t')
        except:
            continue
        if current_key == akey:
            values.append(int(avalue))
        else:
            if current_key:
                reducer(current_key, values)
                values = []
            values.append(int(avalue))
            current_key = akey
    if current_key == akey:
        reducer(current_key, values)

if __name__ == '__main__':
    main()

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

Spark语法

两种方式创建rdd
# 1.初始化 SparkContext,该对象是 Spark 程序的入口,‘Simple App’是程序的名字,一般自定义要通俗易懂
    sc = SparkContext("local", "Simple App")
# 如果是使用自己写入的文件的话,就用parallelize创建rdd
# 2.创建一个1到5的列表List
    data = [i for i in range(1,6)]

# 3.通过 SparkContext 并行化创建 rdd
    rdd = sc.parallelize(data)
# 如果是使用外部文件创建rdd的话,按照如下语句
   rdd = textFile("/root/wordcount.txt")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
接着就是用所需要的算子来完成任务

算子部分edu的网址如下:
https://www.educoder.net/shixuns/imf67y2q/challenges

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/749270
推荐阅读
相关标签
  

闽ICP备14008679号