python调用接口数据写入hive

栏目: 服务器 · 发布时间: 5年前

该方法使用场景为:在hadoop集群进行接口调用,并且获取接口返回值进行解析,解析完成数据写入hive表

其中存在的问题:测试环境和线上环境的一致性,还有接口调用不能一次性并发太高,自己把握这个量

分模块说一下每个模块需要做的:

1、调用get接口:

请求本地接口进行测试,参数需要encode

# -*- coding: utf-8 -*-

import urllib

import urllib2



# get接口调用

get_url ="http://localhost:7000/api/test"



get_params = {"startDate":20180729,"endDate":20180729}



get_params_urlencode = urllib.urlencode(get_params)



get_params_url = get_url+'?'+get_params_urlencode



get_requst = urllib2.Request(url=get_params_url)



get_res_data = urllib2.urlopen(get_requst)



get_res = get_res_data.read()



print get_res;

2、调用post接口:

这个操作复杂一点,加了从文件读取参数,参数是作为list入参

每次读取20个id,循环调用post接口,获取json结果进行解析

# -*- coding: utf-8 -*-



import json

import urllib2

import time



file_dir = "/user/tmp/ids"

post_url = "http://localhost:7000/api/users"



# 读取文本文件获取id列表数据

file = open(file_dir)

data_list = file.readlines()

# 获取id循环次数,除数-1,结果加+1

n = (data_list.__len__() - 1) / 20 + 1

# 初始化list下标

j = 1

while j <= n:

    id_lists = data_list[(j - 1) * 20:j * 20]

    ids = []

    for id in id_lists:

        ids.append(id.strip())

    j += 1

    print "Start : %s" % time.ctime()

    # 拼接参数值

    params = {"staticDate": 20180926, "ids": ids}

    # 参数json格式化

    json_params = json.dumps(params)

    # 组装request请求

    post_requst = urllib2.Request(url=post_url, data=json_params)



    post_requst.add_header('Content-Type', 'application/json')



    post_requst.add_header('Accept', 'application/json')

    # 发起http请求,并获取返回结果

    res = urllib2.urlopen(post_requst)



    # 把http返回的json数据转为 python 对象,

    result = json.loads(res.read())



    result1 = result["data"]



    print result1

    for id in ids:

        print 'id:' + id + '-> name:' + str(result1[id]['name'])

    # 休息20s再继续处理

    time.sleep(20)

    print "End : %s" % time.ctime()



file.close()


3、把获取到的解析结果写入文件:

一般来说写入文件之前需要校验一下文件是否存在,不存在的话新建一个空文件,写个方法校验一下:

# 判断输出文件存不存在,不存在新建,存在删除后新建

def get_file(filename):

    # print os.path.abspath('.')

    if not os.path.isfile(filename):

        print 'file not exists! create file :'+filename

        f = open(filename,'w')

        f.close()

        return 1

    else:

        print 'file exists! starting delete history file !'

        os.remove(filename)

        f = open(filename,'w')

        f.close()

        print 'file created file !'+filename

        return 1

后面写入文件的时候有两种模式,w(w+) 写文件(读写)并且会每次写的时候覆盖 ,a(a+) 写文件(读写)每次写只会在文件后面追加。

下面这个就是往文件追加内容,最好追加之前调用一次get_file()进行文件清空,代码没有贴全,自己可以补充一下

with open(filename, 'a') as f:

    for id in ids: 

        f.write(str(id).strip()+"\t"+str(result1[id]['name']).strip()+"\n")

        f.flush()

f.close()

4、python连接hive-server

可以参考: https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients#HiveServer2Clients-PythonClient

定一个连接hive的方法,用账号秘密进行连接,自己定义 sql 语句:

# 初始化hive联接

def get_hive_conn():

    conn = pyhs2.connect(host='10.10.10.10',

                         port=9090,

                         authMechanism="PLAIN",

                         user='user',

                         password='password',

                         database='db_name')

    return conn

# 执行hive操作语句

def get_result(load_file, dt):

    conn = get_hive_conn()

    hive_sql = "select * from tablename"

    print hive_sql

    with conn.cursor() as cur:

        # Show databases

        # print cur.getDatabases()

        # Execute query

        cur.execute(hive_sql)

        # result = cur.fetchall()

        # print result

        # # Return column info from query

        print cur.getSchema()

        #

        # # Fetch table results

        for i in cur.fetch():

            print i

        cur.close();

    conn.close()

5、 shell 组装模块进行调度

因为还需要操作hdfs文件,shell脚本在我目前的环境是比较方便的,所以需要组装一下python各个模块

  • a、调用接口模块 GetData.py
  • b、写入文本文件的模块 WriteFile.py(调用接口的模块和写入文件可以写一起)
  • c、连接hive的模块 ToHive.py

shell接受 python 参数:

result=`python test.py` 

获取test.py的print语句返回的值

python接受shell参数:

python mta_hive.py ${a}  ${b}

需要在python脚本内引用sys包,python脚本获取参数a = sys.argv[1]   b= sys.argv[2]

shell脚本的大概代码:

#激活python环境

source ./bin/activate

#参数

input="/user/tmp/inputfile.txt"

output="/user/tmp/output.txt"

filename=basename ${output}

hdfs_path="hdfs:///user/temp"

#执行python脚本调用接口,解析返回结果写入文件

result=`python WriteFile.py "${input}" "${output}"`

#把文件put到hdfs上

$HADOOP_HOME/bin/hadoop fs -put -f ${result} ${hdfs_path}

hdfsfile=${hdfs_path}${filename}

#执行hive操作,把文件load到hive表

python ToHive.py ${hdfs_file}

一般来说写入load到hive表到语句是:

load data local inpath '/home/temp/test.txt' overwrite into table db_name.tablename PARTITION (dt=20180926); 

但是如果是读取hdfs文件,需要把local去掉,再附上建表语句:

CREATE EXTERNAL TABLE IF NOT EXISTS dbname.tablename (

id bigint comment 'id',

name string comment '名称'

)COMMENT'测试'

PARTITIONED BY(dt string)

ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

STORED AS TEXTFILE

LOCATION '/user/temp/test'

;

写的比较乱,有时间再梳理一下。

python调用接口数据写入hive

本文由brucelu 创作,采用 知识共享署名-相同方式共享 3.0 中国大陆许可协议 进行许可。

转载、引用前需联系作者,并署名作者且注明文章出处。

本站文章版权归原作者及原出处所有 。内容为作者个人观点, 并不代表本站赞同其观点和对其真实性负责。本站是一个个人学习交流的平台,并不用于任何商业目的,如果有任何问题,请及时联系我们,我们将根据著作权人的要求,立即更正或者删除有关内容。本站拥有对此声明的最终解释权。


以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Uberland

Uberland

Alex Rosenblat / University of California Press / 2018-11-19 / GBP 21.00

Silicon Valley technology is transforming the way we work, and Uber is leading the charge. An American startup that promised to deliver entrepreneurship for the masses through its technology, Uber ins......一起来看看 《Uberland》 这本书的介绍吧!

JS 压缩/解压工具
JS 压缩/解压工具

在线压缩/解压 JS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

HSV CMYK 转换工具
HSV CMYK 转换工具

HSV CMYK互换工具