python
python http服务器
import http.server
import socketserver
PORT = 80
Handler = http.server.SimpleHTTPRequestHandler
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print("serving at port", PORT)
httpd.serve_forever()
python 获取本机 IP
import socket
def get_host_ip():
try:
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.connect(('8.8.8.8', 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
python 后台进程
#!/usr/bin/env python3
import osimport sys
import atexit
import signal
def daemonize(pidfile, *, stdin='/dev/null',
stdout='/dev/null',
stderr='/dev/null'):
if os.path.exists(pidfile):
raise RuntimeError('Already running')
# First fork (detaches from parent)
try:
if os.fork() > 0:
raise SystemExit(0) # Parent exit
except OSError as e:
raise RuntimeError('fork #1 failed.')
os.chdir('/')
os.umask(0)
os.setsid()
# Second fork (relinquish session leadership)
try:
if os.fork() > 0:
raise SystemExit(0)
except OSError as e:
raise RuntimeError('fork #2 failed.')
# Flush I/O buffers
sys.stdout.flush()
sys.stderr.flush()
# Replace file descriptors for stdin, stdout, and stderr
with open(stdin, 'rb', 0) as f:
os.dup2(f.fileno(), sys.stdin.fileno())
with open(stdout, 'ab', 0) as f:
os.dup2(f.fileno(), sys.stdout.fileno())
with open(stderr, 'ab', 0) as f:
os.dup2(f.fileno(), sys.stderr.fileno())
# Write the PID file
with open(pidfile,'w') as f:
print(os.getpid(),file=f)
# Arrange to have the PID file removed on exit/signal
atexit.register(lambda: os.remove(pidfile))
# Signal handler for termination (required)
def sigterm_handler(signo, frame):
raise SystemExit(1)
signal.signal(signal.SIGTERM, sigterm_handler)
def main():
import time
sys.stdout.write('Daemon started with pid {}\n'.format(os.getpid()))
while True:
sys.stdout.write('Daemon Alive! {}\n'.format(time.ctime()))
time.sleep(10)
if __name__ == '__main__':
PIDFILE = '/tmp/daemon.pid'
if len(sys.argv) != 2:
print('Usage: {} [start|stop]'.format(sys.argv[0]), file=sys.stderr)
raise SystemExit(1)
if sys.argv[1] == 'start':
try:
daemonize(PIDFILE,
stdout='/tmp/daemon.log',
stderr='/tmp/dameon.log')
except RuntimeError as e:
print(e, file=sys.stderr)
raise SystemExit(1)
main()
elif sys.argv[1] == 'stop':
if os.path.exists(PIDFILE):
with open(PIDFILE) as f:
os.kill(int(f.read()), signal.SIGTERM)
else:
print('Not running', file=sys.stderr)
raise SystemExit(1)
else:
print('Unknown command {!r}'.format(sys.argv[1]), file=sys.stderr)
raise SystemExit(1)
参考:https://python3-cookbook.readthedocs.io/zh_CN/latest/c12/p14_launching_daemon_process_on_unix.html
tornado 异步demo
from re import T
import tornado.ioloop
import tornado.web
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor
class MainHandler(tornado.web.RequestHandler):
executor = ThreadPoolExecutor(max_workers=2)
def get(self):
self.write("Hello, world")
@run_on_executor
def aa(self):
import os
os.system('ping qq.com')
return 'qq.com'
@tornado.gen.coroutine
def post(self):
print(self.get_argument('aa121', None))
print(self.get_argument('ff', None))
yield self.aa()
self.finish()
def put(self):
self.write("222")
def delete(self):
self.write("333")
def make_app():
return tornado.web.Application([
(r"/", MainHandler),
], debug=True)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()
python 定时任务
from apscheduler.schedulers.blocking import BlockingScheduler
def test(**kwargs):
pass
scheduler = BlockingScheduler()
scheduler.add_job(test, trigger="date", run_date='2021-02-14 15:01:05', kwargs={"desc": "123"})
随机配对
import random
a=[1,2,3,4,5,6,7,8,9,10]
random.shuffle(a)
d= list(range(0,len(a),2))
for i in d:
print(a[i:i+2])
scrapy爬虫
import scrapy
class yxSpider(scrapy.Spider):
name = 'yinxiao'
allowed_domains = ['sc.chinaz.com']
start_urls = ['http://sc.chinaz.com/yinxiao/index_{0}.html'.format(i) for i in range(2, 10)]
start_urls.append('http://sc.chinaz.com/yinxiao/')
def parse(self, response):
for i in response.xpath('//div[@class="text_left"]//div[@class="music_block"]'):
yield {
'名称': i.xpath('p[@class="z"]//text()').extract(),
'下载链接': i.xpath('p[@class="n1"]//@thumb').extract()
}
celery自定义result_backend后端
自定义模块
#celery_backends:DatabaseBackend
from celery.backends.base import BaseDictBackend
class DatabaseBackend(BaseDictBackend):
def _store_result(self, task_id, result, status, traceback=None, request=None):
print(result)
print(task_id)
print(request)
配置使用自定义模块
# celery加载result_backend配置时会加载entry_point中的配置,django-celery-results基于该方式
# 注册entry_point,供celery解析result_backend时调用自定义模块
# distribution._ep_map.clear() # 清除注册的entry_point
distribution = pkg_resources.Distribution(__name__)
entry_point = pkg_resources.EntryPoint.parse('yw-db = apps.common.modules.celery_backends:DatabaseBackend', dist=distribution)
distribution._ep_map = {'celery.result_backends': {'yw-db': entry_point}}
pkg_resources.working_set.add(distribution)
# 使用该配置
app.conf.result_backend = 'yw-db'
django-rest-framework ModelViewSet context添加内容
方法一:
ViewSet:
class LanguageViewSet(viewsets.ModelViewSet):
queryset = Language.objects.all()
serializer_class = LanguageSerializer
filter_backends = (filters.DjangoFilterBackend, )
filter_fields = ('name', 'active')
def get_serializer_context(self):
context = super().get_serializer_context()
context['foo'] = 'bar'
return context
Serializer:
class YourSerializer(serializers.Serializer):
field = serializers.CharField()
def to_representation(self, instance):
ret = super().to_representation(instance)
# Access self.context here to add contextual data into ret
ret['foo'] = self.context['foo']
return ret
方法二:
创建自定义TemplateHTMLRenderer
class TemplateHTMLRendererWithContext(TemplateHTMLRenderer):
def render(self, data, accepted_media_type=None, renderer_context=None):
# We can't really call super in this case, since we need to modify the inner working a bit
renderer_context = renderer_context or {}
view = renderer_context.pop('view')
request = renderer_context.pop('request')
response = renderer_context.pop('response')
view_kwargs = renderer_context.pop('kwargs')
view_args = renderer_context.pop('args')
if response.exception:
template = self.get_exception_template(response)
else:
template_names = self.get_template_names(response, view)
template = self.resolve_template(template_names)
context = self.resolve_context(data, request, response, render_context)
return template_render(template, context, request=request)
def resolve_context(self, data, request, response, render_context):
if response.exception:
data['status_code'] = response.status_code
data.update(render_context)
return data
通过get_renderer_context方法添加数据
class LanguageViewSet(viewsets.ModelViewSet):
queryset = Language.objects.all()
serializer_class = LanguageSerializer
filter_backends = (filters.DjangoFilterBackend, )
filter_fields = ('name', 'active')
def get_renderer_context(self):
context = super().get_renderer_context()
context['foo'] = 'bar'
return context
django执行原始sql
model raw执行原始sql:
people = Person.objects.raw('SELECT id, first_name FROM myapp_person')
直接操作settings配置的默认数据库:
from django.db import connection
def my_custom_sql(self):
with connection.cursor() as cursor:
cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
row = cursor.fetchone()
return row
settings配置多个数据库时指定数据库:
from django.db import connections
def my_custom_sql(self):
with connections['my_db_alias'].cursor() as cursor:
cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
cursor.execute("SELECT foo FROM bar WHERE baz = %s", [self.baz])
row = cursor.fetchone()
return row
执行sql结果以dict返回,同pymysql cursor = db.cursor(pymysql.cursors.DictCursor):
# 自定义函数
def dictfetchall(cursor):
"Return all rows from a cursor as a dict"
columns = [col[0] for col in cursor.description]
return [dict(zip(columns, row)) for row in cursor.fetchall()]
with connections['my_db_alias'].cursor() as cursor:
cursor.execute("UPDATE bar SET foo = 1 WHERE baz = %s", [self.baz])
row = dictfetchall(cursor)
参考:https://docs.djangoproject.com/en/2.1/topics/db/sql/#executing-custom-sql-directly
django获取当前url
-
request.scheme: 获取http/https类型
-
request.get_host(): 获取url服务器地址
-
request.path: 获取url 路径
-
request.build_absolute_uri():获取完整的url
-
from django.urls import resolve
resolve(self.request.path).url_name:获取当前url name
-
from django.urls import reverse
reverse(resolve(self.request.path).url_name):根据url name获取url路径
celery 调用ansible api没有返回
解决方法:
import multiprocessing
# multiprocessing不允许守护进程创建子进程。否则,守护进程会在子进程退出时终止其子进程造成ansible命令没有执行返回为空
multiprocessing.current_process()._config['daemon'] = False # 关闭daemon
daemon子进程不能在通过multiprocessing创建后代进程,否则当父进程退出后,它终结其daemon子进程,那孙子进程就成了孤立的进程了。
当尝试这么做时,会报错:AssertionError: daemonic processes are not allowed to have children但是,daemon子进程还可以通过subprocess创建后代进程。
参考:https://www.jianshu.com/p/4c916810cf8d
flask 接口授权认证
https://github.com/responsible/Flask-Restful-Role-Auth
https://pythonhosted.org/Flask-JWT/
https://www.jianshu.com/p/2367daef7fa6
Python 生成pyc文件
支持指定文件或路径
python -m py_compile /tmp/a.py
python 读取配置文件
[db]
db_port = 3306
db_user = root
db_host = 127.0.0.1
db_pass = xgmtest
[concurrent]
processor = 20
thread = 10