跳到主要内容

python

python http服务器

import http.server
import socketserver


PORT = 80


Handler = http.server.SimpleHTTPRequestHandler


with socketserver.TCPServer(("", PORT), Handler) as httpd:
print("serving at port", PORT)
httpd.serve_forever()

python 获取本机 IP

import socket
def get_host_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip

python 后台进程

#!/usr/bin/env python3

import osimport sys
import atexit
import signal


def daemonize(pidfile, *, stdin='/dev/null',
stdout='/dev/null',
stderr='/dev/null'):

if os.path.exists(pidfile):
raise RuntimeError('Already running')

# First fork (detaches from parent)
try:
if os.fork() > 0:
raise SystemExit(0) # Parent exit
except OSError as e:
raise RuntimeError('fork #1 failed.')

os.chdir('/')
os.umask(0)
os.setsid()
# Second fork (relinquish session leadership)
try:
if os.fork() > 0:
raise SystemExit(0)
except OSError as e:
raise RuntimeError('fork #2 failed.')


# Flush I/O buffers
sys.stdout.flush()
sys.stderr.flush()

# Replace file descriptors for stdin, stdout, and stderr
with open(stdin, 'rb', 0) as f:
os.dup2(f.fileno(), sys.stdin.fileno())
with open(stdout, 'ab', 0) as f:
os.dup2(f.fileno(), sys.stdout.fileno())
with open(stderr, 'ab', 0) as f:
os.dup2(f.fileno(), sys.stderr.fileno())

# Write the PID file
with open(pidfile,'w') as f:
print(os.getpid(),file=f)

# Arrange to have the PID file removed on exit/signal
atexit.register(lambda: os.remove(pidfile))

# Signal handler for termination (required)
def sigterm_handler(signo, frame):
raise SystemExit(1)

signal.signal(signal.SIGTERM, sigterm_handler)

def main():
import time
sys.stdout.write('Daemon started with pid {}\n'.format(os.getpid()))
while True:
sys.stdout.write('Daemon Alive! {}\n'.format(time.ctime()))
time.sleep(10)


if __name__ == '__main__':
PIDFILE = '/tmp/daemon.pid'

if len(sys.argv) != 2:
print('Usage: {} [start|stop]'.format(sys.argv[0]), file=sys.stderr)
raise SystemExit(1)

if sys.argv[1] == 'start':
try:
daemonize(PIDFILE,
stdout='/tmp/daemon.log',
stderr='/tmp/dameon.log')
except RuntimeError as e:
print(e, file=sys.stderr)
raise SystemExit(1)

main()

elif sys.argv[1] == 'stop':
if os.path.exists(PIDFILE):
with open(PIDFILE) as f:
os.kill(int(f.read()), signal.SIGTERM)
else:
print('Not running', file=sys.stderr)
raise SystemExit(1)

else:
print('Unknown command {!r}'.format(sys.argv[1]), file=sys.stderr)
raise SystemExit(1)

参考:https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p14_launching_daemon_process_on_unix.html

tornado 异步demo

from re import T
import tornado.ioloop
import tornado.web
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor


class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=2)


def get(self):
self.write("Hello, world")


@run_on_executor
def aa(self):
import os
os.system('ping qq.com')
return 'qq.com'


@tornado.gen.coroutine
def post(self):
print(self.get_argument('aa121', None))
print(self.get_argument('ff', None))


yield self.aa()
self.finish()

def put(self):
self.write("222")


def delete(self):
self.write("333")


def make_app():
return tornado.web.Application([
(r"/", MainHandler),
], debug=True)


if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()

python 定时任务

from apscheduler.schedulers.blocking import BlockingScheduler

def test(**kwargs)
pass

scheduler = BlockingScheduler()
scheduler.add_job(test, trigger="date", run_date='2021-02-14 15:01:05', kwargs={"desc": "123"})

随机配对

import random
a=[1,2,3,4,5,6,7,8,9,10]


random.shuffle(a)


d= list(range(0,len(a),2))

for i in d:
print(a[i:i+2])

scrapy爬虫

import scrapy

class yxSpider(scrapy.Spider):
name = 'yinxiao'
allowed_domains = ['sc.chinaz.com']
start_urls = ['http://sc.chinaz.com/yinxiao/index_{0}.html'.format(i) for i in range(2, 10)]
start_urls.append('http://sc.chinaz.com/yinxiao/')

def parse(self, response):
for i in response.xpath('//div[@class="text_left"]//div[@class="music_block"]'):
yield {
'名称': i.xpath('p[@class="z"]//text()').extract(),
'下载链接': i.xpath('p[@class="n1"]//@thumb').extract()
}

celery自定义result_backend后端

自定义模块

#celery_backends:DatabaseBackend
from celery.backends.base import BaseDictBackend

class DatabaseBackend(BaseDictBackend):
def _store_result(self, task_id, result, status, traceback=None, request=None):
print(result)
print(task_id)
print(request)

配置使用自定义模块

# celery加载result_backend配置时会加载entry_point中的配置,django-celery-results基于该方式
# 注册entry_point,供celery解析result_backend时调用自定义模块
# distribution._ep_map.clear() # 清除注册的entry_point
distribution = pkg_resources.Distribution(__name__)
entry_point = pkg_resources.EntryPoint.parse('yw-db = apps.common.modules.celery_backends:DatabaseBackend', dist=distribution)
distribution._ep_map = {'celery.result_backends': {'yw-db': entry_point}}
pkg_resources.working_set.add(distribution)


# 使用该配置
app.conf.result_backend = 'yw-db'

django-rest-framework ModelViewSet context添加内容

方法一:

ViewSet:

class LanguageViewSet(viewsets.ModelViewSet):
queryset = Language.objects.all()
serializer_class = LanguageSerializer
filter_backends = (filters.DjangoFilterBackend, )
filter_fields = ('name', 'active')

def get_serializer_context(self):
context = super().get_serializer_context()
context['foo'] = 'bar'
return context

Serializer:

class YourSerializer(serializers.Serializer):
field = serializers.CharField()

def to_representation(self, instance):
ret = super().to_representation(instance)
# Access self.context here to add contextual data into ret
ret['foo'] = self.context['foo']
return ret

方法二:

创建自定义TemplateHTMLRenderer

class TemplateHTMLRendererWithContext(TemplateHTMLRenderer):
def render(self, data, accepted_media_type=None, renderer_context=None):
# We can't really call super in this case, since we need to modify the inner working a bit
renderer_context = renderer_context or {}
view = renderer_context.pop('view')
request = renderer_context.pop('request')
response = renderer_context.pop('response')
view_kwargs = renderer_context.pop('kwargs')
view_args = renderer_context.pop('args')

if response.exception:
template = self.get_exception_template(response)
else:
template_names = self.get_template_names(response, view)
template = self.resolve_template(template_names)

context = self.resolve_context(data, request, response, render_context)
return template_render(template, context, request=request)

def resolve_context(self, data, request, response, render_context):
if response.exception:
data['status_code'] = response.status_code
data.update(render_context)
return data

通过get_renderer_context方法添加数据

class LanguageViewSet(viewsets.ModelViewSet):

queryset = Language.objects.all()
serializer_class = LanguageSerializer
filter_backends = (filters.DjangoFilterBackend, )
filter_fields = ('name', 'active')

def get_renderer_context(self):
context = super().get_renderer_context()
context['foo'] = 'bar'
return context

参考:https://stackoverflow.com/questions/39202380/django-rest-framework-how-to-add-context-to-a-viewset?noredirect=1&lq=1

django执行原始sql

model raw执行原始sql:

people = Person.objects.raw('SELECT id, first_name FROM myapp_person')

直接操作settings配置的默认数据库:

from django.db import connection

def my_custom_sql(self):
with connection.cursor() as cursor:
cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
row = cursor.fetchone()
return row

settings配置多个数据库时指定数据库:

from django.db import connections

def my_custom_sql(self):
with connections['my_db_alias'].cursor() as cursor:
cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
row = cursor.fetchone()

return row

执行sql结果以dict返回,同pymysql cursor = db.cursor(pymysql.cursors.DictCursor):

# 自定义函数
def dictfetchall(cursor):
"Return all rows from a cursor as a dict"
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]

with connections['my_db_alias'].cursor() as cursor:
cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
row = dictfetchall(cursor)

参考:https://docs.djangoproject.com/en/2.1/topics/db/sql/#executing-custom-sql-directly

django获取当前url

  • request.scheme: 获取http/https类型

  • request.get_host(): 获取url服务器地址

  • request.path: 获取url 路径

  • request.build_absolute_uri():获取完整的url

  • from django.urls import resolve

    resolve(self.request.path).url_name:获取当前url name

  • from django.urls import reverse

    reverse(resolve(self.request.path).url_name):根据url name获取url路径

celery 调用ansible api没有返回

解决方法:

import multiprocessing

# multiprocessing不允许守护进程创建子进程。否则,守护进程会在子进程退出时终止其子进程造成ansible命令没有执行返回为空
multiprocessing.current_process()._config['daemon'] = False # 关闭daemon

daemon子进程不能在通过multiprocessing创建后代进程,否则当父进程退出后,它终结其daemon子进程,那孙子进程就成了孤立的进程了。

当尝试这么做时,会报错:AssertionError: daemonic processes are not allowed to have children但是,daemon子进程还可以通过subprocess创建后代进程。

参考:https://www.jianshu.com/p/4c916810cf8d

flask 接口授权认证

https://github.com/responsible/Flask-Restful-Role-Auth

https://pythonhosted.org/Flask-JWT/

https://www.jianshu.com/p/2367daef7fa6

Python 生成pyc文件

支持指定文件或路径

python -m py_compile /tmp/a.py

python 读取配置文件

[db]
db_port = 3306
db_user = root
db_host = 127.0.0.1
db_pass = xgmtest

[concurrent]
processor = 20
thread = 10
import ConfigParser    #引用模块

config = ConfigParser.ConfigParser() #直接读取ini文件内容,初始化config实例(建立一个空的数据集实例)
config.read(filename) #通过load文件filename来初始化config实例
config.write(open(filename,'w')) #保存配置
config.get(section, key) #获得指定section中的key的value
config.getint(section, key)
config.getfloat(section, key)
config.getboolean(section, key)
config.set(section, key, value) #在指定section中,添加一对key-value键值对,没有section时返回 NoSectionError异常


config.add_section(section) #添加sectio,若已经存在,返回DuplicateSectionError异常
config.remove_section(section) #删除指定section,只有存放并移除时才会返回True,其他返回False
config.remove_option(section, key) #删除指定section的key,成功返回True, 失败返回False,不存在section时抛出NoSectionError异常
config.items(section) #返回指定section内的键值对列表(name, value) pairs
config.sections() #得到所有的section,并以列表的形式返回
config.options(section) #得到该section的所有option,也就是key=value中的key部分

config.has_section(section) #判断指定的节是否已经存在
config.has_option(section, option) #判断指定的节是否存在option,存在就返回True,否则返回False

django 模板for支持zip

  1. pip install django-multiforloop
  2. Include 'multiforloop' in your settings.py's list of installed apps
  3. Add {% load multifor %} to the top of any templates which use the multiforloop

模板加载错误解决方法 -Python 2:

Add to pip requirements.py django-multiforloop-patched >= 0.2.0

-Python 3:

1 -> Copy multiforloop/templatetags to a templatetags folder in your project.

2 -> replace from itertools import izip_longest with from itertools import zip_longest

replace from django.template.base import Library with from django import template

replace register = Library() with register = template.Library()

https://github.com/gabrielgrant/django-multiforloop