用Python从Arduino读取串行数据

2024-10-01 07:44:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在用MaxSonar EZ1 ultrasonic range sensor和Arduino Diecimila做一个小项目。在

使用MaxSonar playground code,我让Arduino每隔0.5秒编写一次序列号的英寸数,以及一个分隔符。监视串行数据时,输出类似于:

5.13.15.12.123.39.345...

在Python方面,我有一个基本的Flask应用程序,它有一个/distance路由,它返回一个JSON对象,该对象具有序列值:

^{pr2}$

以及索引.html是一个基本的网站,有一些JS,每半秒就有一个新的阅读内容进行投票。有了这个值,我应该能够构建一个有趣的UI,它根据我离声纳的距离而改变。在

$(document).ready(function() {

  window.GO = function() {

    this.frequency = 500; // .5 seconds

    this.init = function() {
      window.setInterval(this.update_distance, 500);
    }

    this.update_distance = function() {
      $.get('/distance', function(response) {
        var d = response.distance;
        $('#container').animate({"width": d + "%"});
      }, 'json')
    }
  }

  go = new GO();
  go.init();
});

问题

我遇到的问题是,不能保证当python从serial读取时,会有一个值。通常情况下,当它进行轮询时,我要么得到一个空值,要么得到一个部分值,而其他时候它是精确的。在

如何改变我的技术,使我能够一致地轮询串行数据,并从Arduino串行输出接收最后一个良好的读数?在


Tags: 数据对象goinitresponseupdatefunctionwindow
1条回答
网友
1楼 · 发布于 2024-10-01 07:44:13

你想把你的连读设置为在后台进行,而不是按需进行。您可以使用threadingQueue。一旦确定有一个有效的值,就可以将串行值添加到队列中,然后套接字调用就会从队列中拉出。会是这样的:

from flask import Flask
from flask import render_template
import serial
import json
import random

import threading, Queue

import logging
logging.basicConfig(filename=__file__.replace('.py','.log'),level=logging.DEBUG,format='%(asctime)s [%(name)s.%(funcName)s] %(levelname)s: %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', filemode='a')

class maxSonarSerialThread(threading.Thread):
  def __init__(self, dataQ, errQ, port=None, baudrate=None):
    self.logger = logging.getLogger('sonarSerialThread')
    self.logger.debug('initializing')
    threading.Thread.__init__(self)
    self.ser = serial.Serial()
    self.ser.timeout = 1
    if port is None:
      self.ser.port = "/dev/tty.usbserial-A6004amR"
    else:
      self.ser.port = port
    if baudrate is None:
      self.baudrate = 115200
    else:
      self.baudrate = baudrate
    #self.ser.flushInput()
    self.readCount = 0
    self.sleepDurSec = 5
    self.waitMaxSec = self.sleepDurSec * self.ser.baudrate / 10
    self.dataQ = dataQ
    self.errQ = errQ
    self.keepAlive = True
    self.stoprequest = threading.Event()
    self.setDaemon(True)
    self.dat = None
    self.inputStarted = False
    self.ver = ver

  def run(self):
    self.logger.debug('running')
    dataIn = False
    while not self.stoprequest.isSet():
      if not self.isOpen():
        self.connectForStream()

      while self.keepAlive:
        dat = self.ser.readline()
        //some data validation goes here before adding to Queue...
        self.dataQ.put(dat)
        if not self.inputStarted:
          self.logger.debug('reading')
        self.inputStarted = True
      self.dat.close()
      self.close()
      self.join_fin()

  def join_fin(self):
    self.logger.debug('stopping')
    self.stoprequest.set()

  def connectForStream(self, debug=True):
    '''Attempt to connect to the serial port and fail after waitMaxSec seconds'''
    self.logger.debug('connecting')
    if not self.isOpen():
      self.logger.debug('not open, trying to open')
      try:
        self.open()
      except serial.serialutil.SerialException:
        self.logger.debug('Unable to use port ' + str(self.ser.port) + ', please verify and try again')
        return
    while self.readline() == '' and self.readCount < self.waitMaxSec and self.keepAlive:
        self.logger.debug('reading initial')
        self.readCount += self.sleepDurSec
        if not self.readCount % (self.ser.baudrate / 100):
          self.logger.debug("Verifying MaxSonar data..")
          //some sanity check

    if self.readCount >= self.waitMaxSec:
        self.logger.debug('Unable to read from MaxSonar...')
        self.close()
        return False
    else:
      self.logger.debug('MaxSonar data is streaming...')

    return True

  def isOpen(self):
    self.logger.debug('Open? ' + str(self.ser.isOpen()))
    return self.ser.isOpen()

  def open(self):
    self.ser.open()

  def stopDataAquisition(self):
    self.logger.debug('Falsifying keepAlive')
    self.keepAlive = False

  def close(self):
    self.logger.debug('closing')
    self.stopDataAquisition()
    self.ser.close()

  def write(self, msg):
    self.ser.write(msg)

  def readline(self):
    return self.ser.readline()


app = Flask(__name__,
            static_folder="public",
            template_folder="templates")

port = "/dev/tty.usbserial-A6004amR"
dataQ = Queue.Queue()
errQ = Queue.Queue()
ser = maxSonarSerialThread(dataQ, errQ, port=port, ver=self.hwVersion)
ser.daemon = True
ser.start()

@app.route("/")
def index():
  return render_template('index.html')

@app.route("/distance")
def distance():
  distance = read_distance_from_serial()
  return json.dumps({'distance': distance})

def read_distance_from_serial():
  a = dataQ.get()
  print str(a)
  return a

您需要添加一个方法来连接线程以获得一个优雅的退出,但这应该能让您继续

相关问题 更多 >