Python常用脚本整理

Log日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
标准日志配置
"""
import os
import logging
import logging.handlers
def init_log(log_path, level=logging.INFO, when="D", backup=7,
format="%(levelname)s: %(asctime)s: %(filename)s:%(lineno)d %(message)s",
datefmt="%m-%d %H:%M:%S"):
"""
init_log - initialize log module
Args:
log_path - Log file path prefix.
Log data will go to two files: log_path.log and log_path.log.wf
Any non-exist parent directories will be created automatically
level - msg above the level will be displayed
DEBUG < INFO < WARNING < ERROR < CRITICAL
the default value is logging.INFO
when - how to split the log file by time interval
'S' : Seconds
'M' : Minutes
'H' : Hours
'D' : Days
'W' : Week day
default value: 'D'
format - format of the log
default format:
%(levelname)s: %(asctime)s: %(filename)s:%(lineno)d * %(thread)d %(message)s
INFO: 12-09 18:02:42: log.py:40 * 139814749787872 HELLO WORLD
backup - how many backup file to keep
default value: 7
Raises:
OSError: fail to create log directories
IOError: fail to open log file
"""
formatter = logging.Formatter(format, datefmt)
logger = logging.getLogger()
logger.setLevel(level)
dir = os.path.dirname(log_path)
if not os.path.isdir(dir):
os.makedirs(dir)
handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log",
when=when,
backupCount=backup)
handler.setLevel(level)
handler.setFormatter(formatter)
logger.addHandler(handler)
handler = logging.handlers.TimedRotatingFileHandler(log_path + ".log.wf",
when=when,
backupCount=backup)
handler.setLevel(logging.WARNING)
handler.setFormatter(formatter)
logger.addHandler(handler)

日志初始化样例:

1
2
3
4
5
6
7
8
9
import log
import os

"""
初始化日志配置
"""
cur_time = os.popen("date +%Y%m%d").read().strip()
log_path = "./log/sample" + cur_time
log.init_log(log_path)

Http请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import urllib2
import sys

reload(sys)
sys.setdefaultencoding('utf8')
import __init__
import logging

TIME_OUT = 60


def getHeader(request, headers):
"""
组装header
"""
for k in headers:
request.add_header(k, headers[k.strip()])
return request


def get(url, headers={}):
"""
headers : { 'User-Agent' : user_agent } 字典
"""
try:
request = urllib2.Request(url)
request = getHeader(request, headers)
response = urllib2.urlopen(request, timeout=TIME_OUT)
result = response.read().encode('utf-8')
return result
except Exception, e:
logging.error(str(e))
return None

def post(url, headers, body):
"""
headers : { 'User-Agent' : user_agent } 字典
body : String 通过header中的Content-Type来表示解析类型
"""
try:
request = urllib2.Request(url)
request = getHeader(request, headers)
request.get_method = lambda: 'POST'
if body and body != "{}":
request.add_data(body)
response = urllib2.urlopen(request, timeout=TIME_OUT)
result = response.read().encode('utf-8')
return result
except Exception as e:
logging.error(str(e))
return None


def delete(url, headers):
"""
headers : { 'User-Agent' : user_agent } 字典
"""
try:
request = urllib2.Request(url)
request = getHeader(request, headers)
request.get_method = lambda: 'DELETE'
response = urllib2.urlopen(request, timeout=TIME_OUT)
result = response.read().encode('utf-8')
return result
except Exception, e:
logging.error(str(e))
return None

Mysql连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import MySQLdb

def exe_sql(sql, type=0):
"""
返回结果为元组格式
"""
try:
conn = MySQLdb.connect(host, user, passwd, db, port)
cur = conn.cursor()
cur.execute(sql)
results = cur.fetchall()
conn.commit()
cur.close()
conn.close()
return results
except MySQLdb.Error, e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])

def exe_sql_return_dict(sql, type=0):
"""
返回结果为字典格式
"""
try:
conn = MySQLdb.connect(host, user, passwd, db, port)
cur = conn.cursor(MySQLdb.cursors.DictCursor)
cur.execute(sql)
results = cur.fetchall()
conn.commit()
cur.close()
conn.close()
return results
except MySQLdb.Error, e:
print "Mysql Error %d: %s" % (e.args[0], e.args[1])

Main函数

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import json
reload(sys)
sys.setdefaultencoding('utf8')


if __name__ == '__main__':
for line in sys.stdin:
print line.strip()

Write文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import json

def write_to_file(filepath, content, mode="a+"):
"""
写入文件
"""
with open(filepath, mode) as fw:
# fw.write(content + "\n")
fw.write(content)

def write_pretty(filepath, content):
"""
写带缩进json内容到文件
"""
with open(filepath, 'at') as fw:
res = json.dump(content, fw, sort_keys=True, indent=4, separators=(',', ': '))

Read配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#!/usr/bin/env python
# -*- coding:utf-8 -*-
"""
读取配置文件
"""
import ConfigParser
import string
import logging
import log

class Parser(object):
"""
读取配置文件
"""
def __init__(self, configfile):
self.cf = ConfigParser.ConfigParser()
self.cf.read(configfile)


def get_all_sections(self):
"""
获取所有的section
"""
secs = self.cf.sections()
return secs


def list_options(self, section):
"""
获取某个section中所有的key
"""
opts = self.cf.options(section)
return opts


def get_key(self, section, key):
"""
根据section和key获取value
"""
try:
value = self.cf.get(section, key)
if value:
return value
return None
except Exception as e:
logging.error(key + " in " + section + " does't exist!")
return None


if __name__ == '__main__':
conf = Parser("host.conf")
for section in conf.get_all_sections():
print section
keys = conf.list_options(section)
for key in keys:
print key + ' : ' + conf.get_key(section, key)

host.conf内容如下:

1
2
3
4
5
6
7
8
9
[CHINA]
provice=Beijing
city=Beijing
district=Haidian

[USA]
provice=State of New York
city=NewYork
district=B13

脚本输出内容如下:

1
2
3
4
5
6
7
8
CHINA
provice : Beijing
city : Beijing
district : Haidian
USA
provice : State of New York
city : NewYork
district : B13

Http服务

SimpleHttpServer

一键启动一个http服务,可以快速满足文件下载等需求。

1
python -m SimpleHTTPServer [端口号]

Flask

flask是python提供的一个便捷http服务框架,简单高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python
# -*- coding:utf-8 -*-

import flask
from flask import Flask
from flask import make_response,render_template,Response
import time
import hashlib

app = Flask(__name__)
@app.route('/')
def index():
stamp = str(int(time.time()))
print stamp
token = hashlib.md5(("bigdata-meta" + stamp + "z2spmoowoeao2bzvqs796a8t08tszso0").encode('utf8')).hexdigest()
result = "timestamp: %s <br/><br/> token: %s" % (stamp, token)
return result

@app.route('/callback')
def read():
print "callback"
return 0;

if __name__ == "__main__":
app.run(host='0.0.0.0', debug=True, port=8343)

单元测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
对util模块进行测试
Authors: austin
Date: 16/7/12 17:24.
"""

import sys
import os
import unittest

import util

class UtilTestCase(unittest.TestCase):
"""
测试类
"""
def setUp(self):
"""
初始化
"""
self.cs_list = ['3182242396,2726385067,72','4030018123,3852097821,72']
self.expect_url_list = ['http://t10.baidu.com/it/u=3182242396,2726385067&fm=72',
'http://t10.baidu.com/it/u=4030018123,3852097821&fm=72']

def test_get_cs_url(self):
"""
测试生成url函数正确性
"""
ret = map(util.get_cs_url, self.cs_list)
self.assertEqual(ret, self.expect_url_list)


if __name__ == '__main__':
unittest.main()