Main
KKorack GPIO setting.pptx
startKK.sh
-
#!/bin/bash echo echo '##### START KKo KKo Rack #####' echo wpa_cli -i wlan0 status rm /home/pi/work/opencv/nohup.out cd /home/pi/work/opencv/ #nohup python3 /home/pi/work/opencv/KKserver.py & sleep 3 #tail -f /home/pi/work/opencv/nohup.out echo echo '##### STARTed KKo KKo Rack #####' echo
stopKK.sh
-
#!/bin/bash echo echo '##### STOP KKo KKo Rack #####' echo ps -ef | grep KKserver.py | grep -v grep KILLPID=`ps -ef | grep KKserver.py | grep -v grep | awk '{print($2)}'` echo "Process = " $KILLPID kill -9 $KILLPID echo echo '##### STOPed KKo KKo Rack #####' echo
statusKK.sh
-
#!/bin/bash echo echo '##### STATUS KKo KKo Rack #####' echo ps -ef | grep KKserver.py | grep -v grep KKORACKPID=`ps -ef | grep KKserver.py | grep -v grep | awk '{print($2)}'` echo "Process = " $KKORACKPID echo echo '##### STATUS KKo KKo Rack #####' echo
KKserver.py
-
from flask import Flask, render_template, send_from_directory, Response, send_file, request, redirect, url_for from flask_socketio import SocketIO from pathlib import Path from capture import capture_and_save from usbwebcamera import UsbWebCamera from piwebcamera import PiWebCamera import argparse, logging, logging.config, conf import os from gpioControl import GpioControl from listenSpeech import ListenSpeech from urllib.parse import parse_qs from power import PowerStatus from noise_detector import Noise_Detector app = Flask(__name__) socketio = SocketIO(app) archive_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'archive') gpio = GpioControl() listenspeech = ListenSpeech() power = PowerStatus() logging.config.dictConfig(conf.dictConfig) logger = logging.getLogger(__name__) usbcamera = UsbWebCamera(video_source=1, do_display=False) usbcamera.start() picamera = PiWebCamera(video_source=0, do_display=False) picamera.start() # Setup detectors nd = Noise_Detector() nd.start() @app.after_request def add_header(r): """ Add headers to both force latest IE rendering or Chrome Frame, and also to cache the rendered page for 10 minutes """ r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate" r.headers["Pragma"] = "no-cache" r.headers["Expires"] = "0" r.headers["Cache-Control"] = "public, max-age=0" return r @app.route("/") def entrypoint1(): logger.debug("Requested /") return render_template("index.html") @app.route("/index.html") def entrypoint2(): logger.debug("Requested /") return render_template("index.html") @app.route("/usbcapture") def captureusb(): logger.debug("Requested USB capture") im = usbcamera.get_frame(_bytes=False) capture_and_save(im) return render_template("send_to_init.html") @app.route("/picapture") def capturepi(): logger.debug("Requested PICAM capture") im = picamera.get_frame(_bytes=False) capture_and_save(im) return render_template("send_to_init.html") @app.route("/video/last_video") def last_video(): logger.debug("Requested last video") for filename in sorted(os.listdir(archive_path), reverse=True): if not filename.startswith('.'): type = get_type(filename) if type == "video": return send_from_directory(archive_path, filename) @app.route("/audio/last_audio") def last_wav(): logger.debug("Requested last video") for filename in sorted(os.listdir(archive_path), reverse=True): if not filename.startswith('.'): type = get_type(filename) if type == "audio": return send_from_directory(archive_path, filename) ''' ##### Achive File Section ##### ''' @app.route('/archive') def archive(): return render_template('archive.html') def get_type(filename): name, extension = os.path.splitext(filename) return 'video' if extension == '.mp4' else 'audio' if extension == '.wav' else 'audio' if extension == '.mp3' else 'photo' @app.route('/archive/<string:filename>') def archive_item(filename): name, extension = os.path.splitext(filename) type = get_type(filename) return render_template('record.html', filename=filename, type=type) @app.route('/archive/delete/<string:filename>') def archive_delete(filename): os.remove(archive_path + "/" + filename) return redirect(url_for('archive')) @app.route('/archive/play/<string:filename>') def archive_play(filename): return send_file('archive/' + filename) def get_records(): records = [] for filename in sorted(os.listdir(archive_path), reverse=True): if not filename.startswith('.'): type = get_type(filename) size = byte_to_mb(os.path.getsize(archive_path + "/" + filename)) record = {"filename": filename, 'size': size, 'type': type} records.append(record) return records def byte_to_mb(byte): mb = "{:.2f}".format(byte / 1024 / 1024) return str(mb) + " MB" app.jinja_env.globals.update(get_records=get_records) ''' ##### Achive File Section ##### ''' def genpi(picamera): logger.debug("Starting PI stream") while True: frame = picamera.get_frame() yield (b'--frame\r\n' b'Content-Type: image/png\r\n\r\n' + frame + b'\r\n') @app.route("/bothstream") def bothstream_page(): logger.debug("Requested stream page") return render_template("bothstream.html") @app.route("/usbstream") def usbstream_page(): logger.debug("Requested stream page") return render_template("usbstream.html") @app.route("/pistream") def pistream_page(): logger.debug("Requested stream page") return render_template("pistream.html") @app.route("/video_usb_feed") def video_usb_feed(): # return Response(genusb(usbcamera), # mimetype="multipart/x-mixed-replace; boundary=frame") def gen_video(): while True: frame = usbcamera.get_frame() if frame is not None: yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') return Response( gen_video(), mimetype='multipart/x-mixed-replace; boundary=frame' ) @app.route("/video_pi_feed") def video_pi_feed(): return Response(genpi(picamera), mimetype="multipart/x-mixed-replace; boundary=frame") @app.route("/temperature") def temperature(): content = os.popen("vcgencmd measure_temp").readline() content = content.replace("temp=", "") powerstatus = power.getPowerStatus() return Response(content+"["+powerstatus+"]", mimetype='text/xml') ''' ##### Control Camera Section ##### ''' @app.route("/msg", methods=['GET', 'POST']) def msg(): if request.method == 'POST': message = request.form['textinput'] if not message : reString = "No received message" else : reString = listenspeech.speech(message) print("Get Message = " + reString) return reString @app.route("/init") def init(): reString = "Camera position init : " + gpio.initMotorPosition() print("Move init ...") return Response(reString, mimetype='text/html') @app.route("/up") def up(): reString = gpio.moveUp() print("Move Up ...") return Response(reString, mimetype='text/html') @app.route("/down") def down(): reString = gpio.moveDown() print("Move Down ...") return Response(reString, mimetype='text/html') @app.route("/left") def left(): reString = gpio.moveLeft() print("Move Left ...") return Response(reString, mimetype='text/html') @app.route("/right") def right(): reString = gpio.moveRight() print("Move Right ...") return Response(reString, mimetype='text/html') ''' ##### Caterpillar Tracks ##### ''' @app.route("/forward", methods=['GET', 'POST']) def forward(): if request.method == 'POST': moveOrder = request.form['moveCnt'] if not moveOrder : reString = "Do not move" else : reString = gpio.goForward(moveOrder) print("Go Forward ...("+moveOrder+")") return reString @app.route("/backward", methods=['GET', 'POST']) def backward(): if request.method == 'POST': moveOrder = request.form['moveCnt'] if not moveOrder : reString = "Do not move" else : reString = gpio.goBackward(moveOrder) print("Go Backward ...("+moveOrder+")") return reString @app.route("/turnleftback", methods=['GET', 'POST']) def turnleftback(): if request.method == 'POST': moveOrder = request.form['moveCnt'] if not moveOrder : reString = "Do not move" else : reString = gpio.goTurnleftback(moveOrder) print("Go Turn Left back...("+moveOrder+")") return reString @app.route("/turnrightback", methods=['GET', 'POST']) def turnrightback(): if request.method == 'POST': moveOrder = request.form['moveCnt'] if not moveOrder : reString = "Do not move" else : reString = gpio.goTurnrightback(moveOrder) print("Go Turn Right back...("+moveOrder+")") return reString @app.route("/turnleftforward", methods=['GET', 'POST']) def turnleftforward(): if request.method == 'POST': moveOrder = request.form['moveCnt'] if not moveOrder : reString = "Do not move" else : reString = gpio.goTurnleftforward(moveOrder) print("Go Turn Left forward...("+moveOrder+")") return reString @app.route("/turnrightforward", methods=['GET', 'POST']) def turnrightforward(): if request.method == 'POST': moveOrder = request.form['moveCnt'] if not moveOrder : reString = "Do not move" else : reString = gpio.goTurnrightforward(moveOrder) print("Go Turn Right forward...("+moveOrder+")") return reString @app.route("/favorit.ico") def favorit_ico(): logger.debug("Requested favorit.ico image") filename = "favorit.ico" return send_file(filename) if __name__=="__main__": # socketio.run(app,host="0.0.0.0",port="3005",threaded=True) parser = argparse.ArgumentParser() parser.add_argument('-p','--port',type=int,default=8081, help="Running port") parser.add_argument("-H","--host",type=str,default='0.0.0.0', help="Address to broadcast") args = parser.parse_args() logger.debug("Starting server") # When it change , it was reloaded #app.run(host=args.host,port=args.port) #app.run(host=args.host,port=args.port,debug=True,use_reloader=False) socketio.run(app, log_output=True, host='0.0.0.0', port=8081, debug=True, use_reloader=False)