Skip to main content

Main

KKorack GPIO setting.pptx

 

startKK.sh

  • #!/bin/bash
    
    echo
    echo '##### START KKo KKo Rack #####'
    echo
    
    wpa_cli -i wlan0 status 
    rm /home/pi/work/opencv/nohup.out
    cd /home/pi/work/opencv/
    #nohup python3 /home/pi/work/opencv/KKserver.py &
    
    sleep 3
    #tail -f /home/pi/work/opencv/nohup.out
    
    echo
    echo '##### STARTed KKo KKo Rack #####'
    echo
    

 

stopKK.sh

  • #!/bin/bash
    
    echo
    echo '##### STOP  KKo KKo Rack #####'
    echo
    
    
    ps -ef | grep KKserver.py | grep -v grep
    KILLPID=`ps -ef | grep KKserver.py | grep -v grep | awk '{print($2)}'`
    echo "Process = " $KILLPID
    kill -9 $KILLPID
    
    echo
    echo '##### STOPed KKo KKo Rack #####'
    echo

 

statusKK.sh

  • #!/bin/bash
    
    echo
    echo '##### STATUS  KKo KKo Rack #####'
    echo
    
    
    ps -ef | grep KKserver.py | grep -v grep
    KKORACKPID=`ps -ef | grep KKserver.py | grep -v grep | awk '{print($2)}'`
    echo "Process = " $KKORACKPID
    
    echo
    echo '##### STATUS KKo KKo Rack #####'
    echo

 

KKserver.py


  • from flask import Flask, render_template, send_from_directory, Response, send_file, request, redirect, url_for
    from flask_socketio import SocketIO
    from pathlib import Path
    from capture import capture_and_save
    from usbwebcamera import UsbWebCamera
    from piwebcamera import PiWebCamera
    import argparse, logging, logging.config, conf
    import os
    from gpioControl import GpioControl
    from listenSpeech import ListenSpeech
    from urllib.parse import parse_qs
    from power import PowerStatus
    from noise_detector import Noise_Detector
    
    
    
    
    
    
    app = Flask(__name__)
    socketio = SocketIO(app)
    archive_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'archive')
    
    
    gpio = GpioControl()
    listenspeech = ListenSpeech()
    power = PowerStatus()
    
    logging.config.dictConfig(conf.dictConfig)
    logger = logging.getLogger(__name__)
    
    usbcamera = UsbWebCamera(video_source=1, do_display=False)
    usbcamera.start()
    
    picamera = PiWebCamera(video_source=0, do_display=False)
    picamera.start()
    
    # Setup detectors
    nd = Noise_Detector()
    nd.start()
    
    @app.after_request
    def add_header(r):
        """
        Add headers to both force latest IE rendering or Chrome Frame,
        and also to cache the rendered page for 10 minutes
        """
        r.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
        r.headers["Pragma"] = "no-cache"
        r.headers["Expires"] = "0"
        r.headers["Cache-Control"] = "public, max-age=0"
        return r
    
    
    @app.route("/")
    def entrypoint1():
        logger.debug("Requested /")
        return render_template("index.html")
    
    @app.route("/index.html")
    def entrypoint2():
        logger.debug("Requested /")
        return render_template("index.html")
    
    @app.route("/usbcapture")
    def captureusb():
        logger.debug("Requested USB capture")
        im = usbcamera.get_frame(_bytes=False)
        capture_and_save(im)
        return render_template("send_to_init.html")
    
    @app.route("/picapture")
    def capturepi():
        logger.debug("Requested PICAM capture")
        im = picamera.get_frame(_bytes=False)
        capture_and_save(im)
        return render_template("send_to_init.html")
    
    @app.route("/video/last_video")
    def last_video():
        logger.debug("Requested last video")
        for filename in sorted(os.listdir(archive_path), reverse=True):
            if not filename.startswith('.'):
                type = get_type(filename)
                if type == "video":
                    return send_from_directory(archive_path, filename)
    
    @app.route("/audio/last_audio")
    def last_wav():
        logger.debug("Requested last video")
        for filename in sorted(os.listdir(archive_path), reverse=True):
            if not filename.startswith('.'):
                type = get_type(filename)
                if type == "audio":
                    return send_from_directory(archive_path, filename)
    
            
    ''' ##### Achive File Section ##### '''
    @app.route('/archive')
    def archive():
            return render_template('archive.html')
    
    def get_type(filename):
            name, extension = os.path.splitext(filename)
            return 'video' if extension == '.mp4' else 'audio' if extension == '.wav' else 'audio' if extension == '.mp3' else 'photo'
    
    @app.route('/archive/<string:filename>')
    def archive_item(filename):
            name, extension = os.path.splitext(filename)
            type = get_type(filename)
            return render_template('record.html', filename=filename, type=type)
    
    @app.route('/archive/delete/<string:filename>')
    def archive_delete(filename):
            os.remove(archive_path + "/" + filename)
            return redirect(url_for('archive'))
    
    @app.route('/archive/play/<string:filename>')
    def archive_play(filename):
            return send_file('archive/' + filename)
            
    def get_records():
            records = []
    
            for filename in sorted(os.listdir(archive_path), reverse=True):
                    if not filename.startswith('.'):
                            type = get_type(filename)
                            size = byte_to_mb(os.path.getsize(archive_path + "/" + filename))
                            record = {"filename": filename, 'size': size, 'type': type}
                            records.append(record)
    
            return records
            
    def byte_to_mb(byte):
            mb = "{:.2f}".format(byte / 1024 / 1024)
            return str(mb) + " MB"
    
    app.jinja_env.globals.update(get_records=get_records)
    
    ''' ##### Achive File Section ##### '''
            
    def genpi(picamera):
        logger.debug("Starting PI stream")
        while True:
            frame = picamera.get_frame()
            yield (b'--frame\r\n'
                   b'Content-Type: image/png\r\n\r\n' + frame + b'\r\n')
    
    @app.route("/bothstream")
    def bothstream_page():
        logger.debug("Requested stream page")
        return render_template("bothstream.html")
    
    @app.route("/usbstream")
    def usbstream_page():
        logger.debug("Requested stream page")
        return render_template("usbstream.html")
    
    @app.route("/pistream")
    def pistream_page():
        logger.debug("Requested stream page")
        return render_template("pistream.html")
    
    
    @app.route("/video_usb_feed")
    def video_usb_feed():
    #    return Response(genusb(usbcamera),
    #        mimetype="multipart/x-mixed-replace; boundary=frame")
        def gen_video():
            while True:
                frame = usbcamera.get_frame()
    
                if frame is not None:
                    yield (b'--frame\r\n'
                        b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
    
        return Response(
            gen_video(),
            mimetype='multipart/x-mixed-replace; boundary=frame'
        )
    
    @app.route("/video_pi_feed")
    def video_pi_feed():
        return Response(genpi(picamera),
            mimetype="multipart/x-mixed-replace; boundary=frame")
    
    
    @app.route("/temperature")
    def temperature():
        content = os.popen("vcgencmd measure_temp").readline()
        content = content.replace("temp=", "")
        powerstatus = power.getPowerStatus()
        return Response(content+"["+powerstatus+"]", mimetype='text/xml')
    
    
    ''' ##### Control Camera Section ##### '''
    @app.route("/msg", methods=['GET', 'POST'])
    def msg():
         if request.method == 'POST':
            message = request.form['textinput'] 
            if not message :
                 reString = "No received message"
            else :
                 reString = listenspeech.speech(message)
            print("Get Message = " + reString)
         return reString
    
    @app.route("/init")
    def init():
        reString = "Camera position init : " + gpio.initMotorPosition()
        print("Move init ...")
        return Response(reString, mimetype='text/html')
    
    @app.route("/up")
    def up():
        reString = gpio.moveUp()
        print("Move Up ...")
        return Response(reString, mimetype='text/html')
        
    @app.route("/down")
    def down():
        reString = gpio.moveDown()
        print("Move Down ...")
        return Response(reString, mimetype='text/html')
    
    @app.route("/left")
    def left():
        reString = gpio.moveLeft()
        print("Move Left ...")
        return Response(reString, mimetype='text/html')
    
    @app.route("/right")
    def right():
        reString = gpio.moveRight()
        print("Move Right ...")
        return Response(reString, mimetype='text/html')
    
    ''' ##### Caterpillar Tracks ##### '''
    @app.route("/forward", methods=['GET', 'POST'])
    def forward():
        if request.method == 'POST':
            moveOrder = request.form['moveCnt']
            if not moveOrder :
                reString = "Do not move"
            else :
                reString = gpio.goForward(moveOrder)
            print("Go Forward ...("+moveOrder+")")
        return reString
    
    @app.route("/backward", methods=['GET', 'POST'])
    def backward():
        if request.method == 'POST':
            moveOrder = request.form['moveCnt']
            if not moveOrder :
                reString = "Do not move"
            else :
                reString = gpio.goBackward(moveOrder)
            print("Go Backward ...("+moveOrder+")")
        return reString
    
    @app.route("/turnleftback", methods=['GET', 'POST'])
    def turnleftback():
        if request.method == 'POST':
            moveOrder = request.form['moveCnt']
            if not moveOrder :
                reString = "Do not move"
            else :
                reString = gpio.goTurnleftback(moveOrder)
            print("Go Turn Left back...("+moveOrder+")")
        return reString
    
    @app.route("/turnrightback", methods=['GET', 'POST'])
    def turnrightback():
        if request.method == 'POST':
            moveOrder = request.form['moveCnt']
            if not moveOrder :
                reString = "Do not move"
            else :
                reString = gpio.goTurnrightback(moveOrder)
            print("Go Turn Right back...("+moveOrder+")")
        return reString
    
    @app.route("/turnleftforward", methods=['GET', 'POST'])
    def turnleftforward():
        if request.method == 'POST':
            moveOrder = request.form['moveCnt']
            if not moveOrder :
                reString = "Do not move"
            else :
                reString = gpio.goTurnleftforward(moveOrder)
            print("Go Turn Left forward...("+moveOrder+")")
        return reString
    
    @app.route("/turnrightforward", methods=['GET', 'POST'])
    def turnrightforward():
        if request.method == 'POST':
            moveOrder = request.form['moveCnt']
            if not moveOrder :
                reString = "Do not move"
            else :
                reString = gpio.goTurnrightforward(moveOrder)
            print("Go Turn Right forward...("+moveOrder+")")
        return reString
            
    
    @app.route("/favorit.ico")
    def favorit_ico():
        logger.debug("Requested favorit.ico image")
        filename = "favorit.ico"
        return send_file(filename)
    
    
            
    if __name__=="__main__":
        # socketio.run(app,host="0.0.0.0",port="3005",threaded=True)
        parser = argparse.ArgumentParser()
        parser.add_argument('-p','--port',type=int,default=8081, help="Running port")
        parser.add_argument("-H","--host",type=str,default='0.0.0.0', help="Address to broadcast")
        args = parser.parse_args()
        logger.debug("Starting server")
            # When it change , it was reloaded
        #app.run(host=args.host,port=args.port)
        #app.run(host=args.host,port=args.port,debug=True,use_reloader=False)  
        socketio.run(app, log_output=True, host='0.0.0.0', port=8081, debug=True, use_reloader=False)