load_cell: Load cell gram scale (#6729)
* Add gram scale features to load_cell * Convert sensor counts to grams and make this available via unix socket and object status * Basic GCodes for tearing and reading the load cell * Guided Calibration * Diagnostic gcode to check the health of the load cell * Update load_cell Documentation * Add API server load_cell/dump_force endpoint * Update [load_cell] config with calibration fields * Add G-Code commands for working with load cells * Add status reference for load_cell objects Signed-off-by: Gareth Farrington <gareth@waves.ky>
This commit is contained in:
parent
d886c1761b
commit
06d65ef5ac
@ -364,37 +364,21 @@ and might later produce asynchronous messages such as:
|
||||
The "header" field in the initial query response is used to describe
|
||||
the fields found in later "data" responses.
|
||||
|
||||
### hx71x/dump_hx71x
|
||||
### load_cell/dump_force
|
||||
|
||||
This endpoint is used to subscribe to raw HX711 and HX717 ADC data.
|
||||
Obtaining these low-level ADC updates may be useful for diagnostic
|
||||
and debugging purposes. Using this endpoint may increase Klipper's
|
||||
system load.
|
||||
This endpoint is used to subscribe to force data produced by a load_cell.
|
||||
Using this endpoint may increase Klipper's system load.
|
||||
|
||||
A request may look like:
|
||||
`{"id": 123, "method":"hx71x/dump_hx71x",
|
||||
`{"id": 123, "method":"load_cell/dump_force",
|
||||
"params": {"sensor": "load_cell", "response_template": {}}}`
|
||||
and might return:
|
||||
`{"id": 123,"result":{"header":["time","counts","value"]}}`
|
||||
`{"id": 123,"result":{"header":["time", "force (g)", "counts", "tare_counts"]}}`
|
||||
and might later produce asynchronous messages such as:
|
||||
`{"params":{"data":[[3292.432935, 562534, 0.067059278],
|
||||
[3292.4394937, 5625322, 0.670590639]]}}`
|
||||
`{"params":{"data":[[3292.432935, 40.65, 562534, -234467]]}}`
|
||||
|
||||
### ads1220/dump_ads1220
|
||||
|
||||
This endpoint is used to subscribe to raw ADS1220 ADC data.
|
||||
Obtaining these low-level ADC updates may be useful for diagnostic
|
||||
and debugging purposes. Using this endpoint may increase Klipper's
|
||||
system load.
|
||||
|
||||
A request may look like:
|
||||
`{"id": 123, "method":"ads1220/dump_ads1220",
|
||||
"params": {"sensor": "load_cell", "response_template": {}}}`
|
||||
and might return:
|
||||
`{"id": 123,"result":{"header":["time","counts","value"]}}`
|
||||
and might later produce asynchronous messages such as:
|
||||
`{"params":{"data":[[3292.432935, 562534, 0.067059278],
|
||||
[3292.4394937, 5625322, 0.670590639]]}}`
|
||||
The "header" field in the initial query response is used to describe
|
||||
the fields found in later "data" responses.
|
||||
|
||||
### pause_resume/cancel
|
||||
|
||||
|
@ -4758,6 +4758,16 @@ scale.
|
||||
[load_cell]
|
||||
sensor_type:
|
||||
# This must be one of the supported sensor types, see below.
|
||||
#counts_per_gram:
|
||||
# The floating point number of sensor counts that indicates 1 gram of force.
|
||||
# This value is calculated by the LOAD_CELL_CALIBRATE command.
|
||||
#reference_tare_counts:
|
||||
# The integer tare value, in raw sensor counts, taken when LOAD_CELL_CALIBRATE
|
||||
# is run. This is the default tare value when klipper starts up.
|
||||
#sensor_orientation:
|
||||
# Change the sensor's orientation. Can be either 'normal' or 'inverted'.
|
||||
# The default is 'normal'. Use 'inverted' if the sensor reports a
|
||||
# decreasing force value when placed under load.
|
||||
```
|
||||
|
||||
#### HX711
|
||||
|
@ -715,6 +715,40 @@ and RAW sensor value for calibration points.
|
||||
#### DISABLE_FILAMENT_WIDTH_LOG
|
||||
`DISABLE_FILAMENT_WIDTH_LOG`: Turn off diameter logging.
|
||||
|
||||
### [load_cell]
|
||||
|
||||
The following commands are enabled if a
|
||||
[load_cell config section](Config_Reference.md#load_cell) has been enabled.
|
||||
|
||||
### LOAD_CELL_DIAGNOSTIC
|
||||
`LOAD_CELL_DIAGNOSTIC [LOAD_CELL=<config_name>]`: This command collects 10
|
||||
seconds of load cell data and reports statistics that can help you verify proper
|
||||
operation of the load cell. This command can be run on both calibrated and
|
||||
uncalibrated load cells.
|
||||
|
||||
### LOAD_CELL_CALIBRATE
|
||||
`LOAD_CELL_CALIBRATE [LOAD_CELL=<config_name>]`: Start the guided calibration
|
||||
utility. Calibration is a 3 step process:
|
||||
1. First you remove all load from the load cell and run the `TARE` command
|
||||
1. Next you apply a known load to the load cell and run the
|
||||
`CALIBRATE GRAMS=nnn` command
|
||||
1. Finally use the `ACCEPT` command to save the results
|
||||
|
||||
You can cancel the calibration process at any time with `ABORT`.
|
||||
|
||||
### LOAD_CELL_TARE
|
||||
`LOAD_CELL_TARE [LOAD_CELL=<config_name>]`: This works just like the tare button
|
||||
on digital scale. It sets the current raw reading of the load cell to be the
|
||||
zero point reference value. The response is the percentage of the sensors range
|
||||
that was read and the raw value in counts.
|
||||
|
||||
### LOAD_CELL_READ load_cell="name"
|
||||
`LOAD_CELL_READ [LOAD_CELL=<config_name>]`:
|
||||
This command takes a reading from the load cell. The response is the percentage
|
||||
of the sensors range that was read and the raw value in counts. If the load cell
|
||||
is calibrated a force in grams is also reported.
|
||||
|
||||
|
||||
### [heaters]
|
||||
|
||||
The heaters module is automatically loaded if a heater is defined in
|
||||
|
122
docs/Load_Cell.md
Normal file
122
docs/Load_Cell.md
Normal file
@ -0,0 +1,122 @@
|
||||
# Load Cells
|
||||
|
||||
This document describes Klipper's support for load cells. Basic load cell
|
||||
functionality can be used to read force data and to weigh things like filament.
|
||||
A calibrated force sensor is an important part of a load cell based probe.
|
||||
|
||||
## Related Documentation
|
||||
|
||||
* [load_cell Config Reference](Config_Reference.md#load_cell)
|
||||
* [load_cell G-Code Commands](G-Codes.md#load_cell)
|
||||
* [load_cell Status Reference](Status_Reference.md#load_cell)
|
||||
|
||||
## Using `LOAD_CELL_DIAGNOSTIC`
|
||||
|
||||
When you first connect a load cell its good practice to check for issues by
|
||||
running `LOAD_CELL_DIAGNOSTIC`. This tool collects 10 seconds of data from the
|
||||
load cell and resport statistics:
|
||||
|
||||
```
|
||||
$ LOAD_CELL_DIAGNOSTIC
|
||||
// Collecting load cell data for 10 seconds...
|
||||
// Samples Collected: 3211
|
||||
// Measured samples per second: 332.0
|
||||
// Good samples: 3211, Saturated samples: 0, Unique values: 900
|
||||
// Sample range: [4.01% to 4.02%]
|
||||
// Sample range / sensor capacity: 0.00524%
|
||||
```
|
||||
|
||||
Things you can check with this data:
|
||||
* The configured sample rate of the sensor should be close to the 'Measured
|
||||
samples per second' value. If it is not you may have a configuration or wiring
|
||||
issue.
|
||||
* 'Saturated samples' should be 0. If you have saturated samples it means the
|
||||
load sell is seeing more force than it can measure.
|
||||
* 'Unique values' should be a large percentage of the 'Samples
|
||||
Collected' value. If 'Unique values' is 1 it is very likely a wiring issue.
|
||||
* Tap or push on the sensor while `LOAD_CELL_DIAGNOSTIC` runs. If
|
||||
things are working correctly ths should increase the 'Sample range'.
|
||||
|
||||
## Calibrating a Load Cell
|
||||
|
||||
Load cells are calibrated using the `LOAD_CELL_CALIBRATE` command. This is an
|
||||
interactive calibration utility that walks you though a 3 step process:
|
||||
1. First use the `TARE` command to establish the zero force value. This is the
|
||||
`reference_tare_counts` config value.
|
||||
2. Next you apply a known load or force to the load cell and run the
|
||||
`CALIBRATE GRAMS=nnn` command. From this the `counts_per_gram` value is
|
||||
calculated. See [the next section](#applying-a-known-force-or-load) for some
|
||||
suggestions on how to do this.
|
||||
3. Finally, use the `ACCEPT` command to save the results.
|
||||
|
||||
You can cancel the calibration process at any time with `ABORT`.
|
||||
|
||||
### Applying a Known Force or Load
|
||||
|
||||
The `CALIBRATE GRAMS=nnn` step can be accomplished in a number of ways. If your
|
||||
load cell is under a platform like a bed or filament holder it might be easiest
|
||||
to put a known mass on the platform. E.g. you could use a couple of 1KG filament
|
||||
spools.
|
||||
|
||||
If your load cell is in the printer's toolhead a different approach is easier.
|
||||
Put a digital scale on the printers bed and gently lower the toolhead onto the
|
||||
scale (or raise the bed into the toolhead if your bed moves). You may be able to
|
||||
do this using the `FORCE_MOVE` command. But more likely you will have to
|
||||
manually moving the z axis with the motors off until the toolhead presses on the
|
||||
scale.
|
||||
|
||||
A good calibration force would ideally be a large percentage of the load cell's
|
||||
rated capacity. E.g. if you have a 5Kg load cell you would ideally calibrate it
|
||||
with a 5kg mass. This might work well with under-bed sensors that have to
|
||||
support a lot of weight. For toolhead probes this may not be a load that your
|
||||
printer bed or toolhead can tolerate without damage. Do try to use at least 1Kg
|
||||
of force, most printers should tolerate this without issue.
|
||||
|
||||
When calibrating make careful note of the values reported:
|
||||
```
|
||||
$ CALIBRATE GRAMS=555
|
||||
// Calibration value: -2.78% (-59803108), Counts/gram: 73039.78739,
|
||||
Total capacity: +/- 29.14Kg
|
||||
```
|
||||
The `Total capacity` should be close to the theoretical rating of the load cell
|
||||
based on the sensor's capacity. If it is much larger you could have used a
|
||||
higher gain setting in the sensor or a more sensitive load cell. This isn't as
|
||||
critical for 32bit and 24bit sensors but is much more critical for low bit width
|
||||
sensors.
|
||||
|
||||
## Reading Force Data
|
||||
Force data can be read with a GCode command:
|
||||
|
||||
```
|
||||
LOAD_CELL_READ
|
||||
// 10.6g (1.94%)
|
||||
```
|
||||
|
||||
Data is also continuously read and can be consumed from the load_cell printer
|
||||
object in a macro:
|
||||
|
||||
```
|
||||
{% set grams = printer.load_cell.force_g %}
|
||||
```
|
||||
|
||||
This provides an average force over the last 1 second, similar to how
|
||||
temperature sensors work.
|
||||
|
||||
## Taring a Load Cell
|
||||
Taring, sometimes called zeroing, sets the current weight reported by the
|
||||
load_cell to 0. This is useful for measuring relative to a known weight. e.g.
|
||||
when measuring a filament spool, using `LOAD_CELL_TARE` sets the weight to 0.
|
||||
Then as filament is printed the load_cell will report the weight of the
|
||||
filament used.
|
||||
|
||||
```
|
||||
LOAD_CELL_TARE
|
||||
// Load cell tare value: 5.32% (445903)
|
||||
```
|
||||
|
||||
The current tare value is reported in the printers status and can be read in
|
||||
a macro:
|
||||
|
||||
```
|
||||
{% set tare_counts = printer.load_cell.tare_counts %}
|
||||
```
|
@ -101,3 +101,4 @@ communication with the Klipper developers.
|
||||
- [TSL1401CL filament width sensor](TSL1401CL_Filament_Width_Sensor.md)
|
||||
- [Hall filament width sensor](Hall_Filament_Width_Sensor.md)
|
||||
- [Eddy Current Inductive probe](Eddy_Probe.md)
|
||||
- [Load Cells](Load_Cell.md)
|
||||
|
@ -303,6 +303,17 @@ The following information is available for each `[led led_name]`,
|
||||
chain could be accessed at
|
||||
`printer["neopixel <config_name>"].color_data[1][2]`.
|
||||
|
||||
## load_cell
|
||||
|
||||
The following information is available for each `[load_cell name]`:
|
||||
- 'is_calibrated': True/False is the load cell calibrated
|
||||
- 'counts_per_gram': The number of raw sensor counts that equals 1 gram of force
|
||||
- 'reference_tare_counts': The reference number of raw sensor counts for 0 force
|
||||
- 'tare_counts': The current number of raw sensor counts for 0 force
|
||||
- 'force_g': The force in grams, averaged over the last polling period.
|
||||
- 'min_force_g': The minimum force in grams, over the last polling period.
|
||||
- 'max_force_g': The maximum force in grams, over the last polling period.
|
||||
|
||||
## manual_probe
|
||||
|
||||
The following information is available in the
|
||||
|
@ -141,4 +141,5 @@ nav:
|
||||
- TSL1401CL_Filament_Width_Sensor.md
|
||||
- Hall_Filament_Width_Sensor.md
|
||||
- Eddy_Probe.md
|
||||
- Load_Cell.md
|
||||
- Sponsors.md
|
||||
|
@ -95,10 +95,6 @@ class ADS1220:
|
||||
self.batch_bulk = bulk_sensor.BatchBulkHelper(
|
||||
self.printer, self._process_batch, self._start_measurements,
|
||||
self._finish_measurements, UPDATE_INTERVAL)
|
||||
# publish raw samples to the socket
|
||||
hdr = {'header': ('time', 'counts', 'value')}
|
||||
self.batch_bulk.add_mux_endpoint("ads1220/dump_ads1220", "sensor",
|
||||
self.name, hdr)
|
||||
# Command Configuration
|
||||
mcu.add_config_cmd(
|
||||
"config_ads1220 oid=%d spi_oid=%d data_ready_pin=%s"
|
||||
|
@ -51,10 +51,6 @@ class HX71xBase:
|
||||
self.batch_bulk = bulk_sensor.BatchBulkHelper(
|
||||
self.printer, self._process_batch, self._start_measurements,
|
||||
self._finish_measurements, UPDATE_INTERVAL)
|
||||
# publish raw samples to the socket
|
||||
dump_path = "%s/dump_%s" % (sensor_type, sensor_type)
|
||||
hdr = {'header': ('time', 'counts', 'value')}
|
||||
self.batch_bulk.add_mux_endpoint(dump_path, "sensor", self.name, hdr)
|
||||
# Command Configuration
|
||||
self.query_hx71x_cmd = None
|
||||
mcu.add_config_cmd(
|
||||
|
@ -3,21 +3,516 @@
|
||||
# Copyright (C) 2024 Gareth Farrington <gareth@waves.ky>
|
||||
#
|
||||
# This file may be distributed under the terms of the GNU GPLv3 license.
|
||||
|
||||
from . import hx71x
|
||||
from . import ads1220
|
||||
from .bulk_sensor import BatchWebhooksClient
|
||||
import collections, itertools
|
||||
# We want either Python 3's zip() or Python 2's izip() but NOT 2's zip():
|
||||
zip_impl = zip
|
||||
try:
|
||||
from itertools import izip as zip_impl # python 2.x izip
|
||||
except ImportError: # will be Python 3.x
|
||||
pass
|
||||
|
||||
# Printer class that controls a load cell
|
||||
# alternative to numpy's column selection:
|
||||
def select_column(data, column_idx):
|
||||
return list(zip_impl(*data))[column_idx]
|
||||
|
||||
def avg(data):
|
||||
return sum(data) / len(data)
|
||||
|
||||
# Helper for event driven webhooks and subscription based API clients
|
||||
class ApiClientHelper(object):
|
||||
def __init__(self, printer):
|
||||
self.printer = printer
|
||||
self.client_cbs = []
|
||||
self.webhooks_start_resp = {}
|
||||
|
||||
# send data to clients
|
||||
def send(self, msg):
|
||||
for client_cb in list(self.client_cbs):
|
||||
res = client_cb(msg)
|
||||
if not res:
|
||||
# This client no longer needs updates - unregister it
|
||||
self.client_cbs.remove(client_cb)
|
||||
|
||||
# Add a client that gets data callbacks
|
||||
def add_client(self, client_cb):
|
||||
self.client_cbs.append(client_cb)
|
||||
|
||||
# Add Webhooks client and send header
|
||||
def _add_webhooks_client(self, web_request):
|
||||
whbatch = BatchWebhooksClient(web_request)
|
||||
self.add_client(whbatch.handle_batch)
|
||||
web_request.send(self.webhooks_start_resp)
|
||||
|
||||
# Set up a webhooks endpoint with a static header
|
||||
def add_mux_endpoint(self, path, key, value, webhooks_start_resp):
|
||||
self.webhooks_start_resp = webhooks_start_resp
|
||||
wh = self.printer.lookup_object('webhooks')
|
||||
wh.register_mux_endpoint(path, key, value, self._add_webhooks_client)
|
||||
|
||||
# Class for handling commands related ot load cells
|
||||
class LoadCellCommandHelper:
|
||||
def __init__(self, config, load_cell):
|
||||
self.printer = config.get_printer()
|
||||
self.load_cell = load_cell
|
||||
name_parts = config.get_name().split()
|
||||
self.name = name_parts[-1]
|
||||
self.register_commands(self.name)
|
||||
if len(name_parts) == 1:
|
||||
self.register_commands(None)
|
||||
|
||||
def register_commands(self, name):
|
||||
# Register commands
|
||||
gcode = self.printer.lookup_object('gcode')
|
||||
gcode.register_mux_command("LOAD_CELL_TARE", "LOAD_CELL", name,
|
||||
self.cmd_LOAD_CELL_TARE,
|
||||
desc=self.cmd_LOAD_CELL_TARE_help)
|
||||
gcode.register_mux_command("LOAD_CELL_CALIBRATE", "LOAD_CELL", name,
|
||||
self.cmd_LOAD_CELL_CALIBRATE,
|
||||
desc=self.cmd_CALIBRATE_LOAD_CELL_help)
|
||||
gcode.register_mux_command("LOAD_CELL_READ", "LOAD_CELL", name,
|
||||
self.cmd_LOAD_CELL_READ,
|
||||
desc=self.cmd_LOAD_CELL_READ_help)
|
||||
gcode.register_mux_command("LOAD_CELL_DIAGNOSTIC", "LOAD_CELL", name,
|
||||
self.cmd_LOAD_CELL_DIAGNOSTIC,
|
||||
desc=self.cmd_LOAD_CELL_DIAGNOSTIC_help)
|
||||
|
||||
cmd_LOAD_CELL_TARE_help = "Set the Zero point of the load cell"
|
||||
def cmd_LOAD_CELL_TARE(self, gcmd):
|
||||
tare_counts = self.load_cell.avg_counts()
|
||||
self.load_cell.tare(tare_counts)
|
||||
tare_percent = self.load_cell.counts_to_percent(tare_counts)
|
||||
gcmd.respond_info("Load cell tare value: %.2f%% (%i)"
|
||||
% (tare_percent, tare_counts))
|
||||
|
||||
cmd_CALIBRATE_LOAD_CELL_help = "Start interactive calibration tool"
|
||||
def cmd_LOAD_CELL_CALIBRATE(self, gcmd):
|
||||
LoadCellGuidedCalibrationHelper(self.printer, self.load_cell)
|
||||
|
||||
cmd_LOAD_CELL_READ_help = "Take a reading from the load cell"
|
||||
def cmd_LOAD_CELL_READ(self, gcmd):
|
||||
counts = self.load_cell.avg_counts()
|
||||
percent = self.load_cell.counts_to_percent(counts)
|
||||
force = self.load_cell.counts_to_grams(counts)
|
||||
if percent >= 100 or percent <= -100:
|
||||
gcmd.respond_info("Err (%.2f%%)" % (percent,))
|
||||
if force is None:
|
||||
gcmd.respond_info("---.-g (%.2f%%)" % (percent,))
|
||||
else:
|
||||
gcmd.respond_info("%.1fg (%.2f%%)" % (force, percent))
|
||||
|
||||
cmd_LOAD_CELL_DIAGNOSTIC_help = "Check the health of the load cell"
|
||||
def cmd_LOAD_CELL_DIAGNOSTIC(self, gcmd):
|
||||
gcmd.respond_info("Collecting load cell data for 10 seconds...")
|
||||
collector = self.load_cell.get_collector()
|
||||
reactor = self.printer.get_reactor()
|
||||
collector.start_collecting()
|
||||
reactor.pause(reactor.monotonic() + 10.)
|
||||
samples, errors = collector.stop_collecting()
|
||||
if errors:
|
||||
gcmd.respond_info("Sensor reported errors: %i errors,"
|
||||
" %i overflows" % (errors[0], errors[1]))
|
||||
else:
|
||||
gcmd.respond_info("Sensor reported no errors")
|
||||
if not samples:
|
||||
raise gcmd.error("No samples returned from sensor!")
|
||||
counts = select_column(samples, 2)
|
||||
range_min, range_max = self.load_cell.saturation_range()
|
||||
good_count = 0
|
||||
saturation_count = 0
|
||||
for sample in counts:
|
||||
if sample >= range_max or sample <= range_min:
|
||||
saturation_count += 1
|
||||
else:
|
||||
good_count += 1
|
||||
gcmd.respond_info("Samples Collected: %i" % (len(samples)))
|
||||
if len(samples) > 2:
|
||||
sensor_sps = self.load_cell.sensor.get_samples_per_second()
|
||||
sps = float(len(samples)) / (samples[-1][0] - samples[0][0])
|
||||
gcmd.respond_info("Measured samples per second: %.1f, "
|
||||
"configured: %.1f" % (sps, sensor_sps))
|
||||
gcmd.respond_info("Good samples: %i, Saturated samples: %i, Unique"
|
||||
" values: %i" % (good_count, saturation_count,
|
||||
len(set(counts))))
|
||||
max_pct = self.load_cell.counts_to_percent(max(counts))
|
||||
min_pct = self.load_cell.counts_to_percent(min(counts))
|
||||
gcmd.respond_info("Sample range: [%.2f%% to %.2f%%]"
|
||||
% (min_pct, max_pct))
|
||||
gcmd.respond_info("Sample range / sensor capacity: %.5f%%"
|
||||
% ((max_pct - min_pct) / 2.))
|
||||
|
||||
# Class to guide the user through calibrating a load cell
|
||||
class LoadCellGuidedCalibrationHelper:
|
||||
def __init__(self, printer, load_cell):
|
||||
self.printer = printer
|
||||
self.gcode = printer.lookup_object('gcode')
|
||||
self.load_cell = load_cell
|
||||
self._tare_counts = self._counts_per_gram = None
|
||||
self.tare_percent = 0.
|
||||
self.register_commands()
|
||||
self.gcode.respond_info(
|
||||
"Starting load cell calibration. \n"
|
||||
"1.) Remove all load and run TARE. \n"
|
||||
"2.) Apply a known load, run CALIBRATE GRAMS=nnn. \n"
|
||||
"Complete calibration with the ACCEPT command.\n"
|
||||
"Use the ABORT command to quit.")
|
||||
|
||||
def verify_no_active_calibration(self,):
|
||||
try:
|
||||
self.gcode.register_command('TARE', 'dummy')
|
||||
except self.printer.config_error as e:
|
||||
raise self.gcode.error(
|
||||
"Already Calibrating a Load Cell. Use ABORT to quit.")
|
||||
self.gcode.register_command('TARE', None)
|
||||
|
||||
def register_commands(self):
|
||||
self.verify_no_active_calibration()
|
||||
register_command = self.gcode.register_command
|
||||
register_command("ABORT", self.cmd_ABORT, desc=self.cmd_ABORT_help)
|
||||
register_command("ACCEPT", self.cmd_ACCEPT, desc=self.cmd_ACCEPT_help)
|
||||
register_command("TARE", self.cmd_TARE, desc=self.cmd_TARE_help)
|
||||
register_command("CALIBRATE", self.cmd_CALIBRATE,
|
||||
desc=self.cmd_CALIBRATE_help)
|
||||
|
||||
# convert the delta of counts to a counts/gram metric
|
||||
def counts_per_gram(self, grams, cal_counts):
|
||||
return float(abs(int(self._tare_counts - cal_counts))) / grams
|
||||
|
||||
# calculate max force that the load cell can register
|
||||
# given tare bias, at saturation in kilograms
|
||||
def capacity_kg(self, counts_per_gram):
|
||||
range_min, range_max = self.load_cell.saturation_range()
|
||||
return (int((range_max - abs(self._tare_counts)) / counts_per_gram)
|
||||
/ 1000.)
|
||||
|
||||
def finalize(self, save_results=False):
|
||||
for name in ['ABORT', 'ACCEPT', 'TARE', 'CALIBRATE']:
|
||||
self.gcode.register_command(name, None)
|
||||
if not save_results:
|
||||
self.gcode.respond_info("Load cell calibration aborted")
|
||||
return
|
||||
if self._counts_per_gram is None or self._tare_counts is None:
|
||||
self.gcode.respond_info("Calibration process is incomplete, "
|
||||
"aborting")
|
||||
self.load_cell.set_calibration(self._counts_per_gram, self._tare_counts)
|
||||
self.gcode.respond_info("Load cell calibration settings:\n\n"
|
||||
"counts_per_gram: %.6f\n"
|
||||
"reference_tare_counts: %i\n\n"
|
||||
"The SAVE_CONFIG command will update the printer config file"
|
||||
" with the above and restart the printer."
|
||||
% (self._counts_per_gram, self._tare_counts))
|
||||
self.load_cell.tare(self._tare_counts)
|
||||
|
||||
cmd_ABORT_help = "Abort load cell calibration tool"
|
||||
def cmd_ABORT(self, gcmd):
|
||||
self.finalize(False)
|
||||
|
||||
cmd_ACCEPT_help = "Accept calibration results and apply to load cell"
|
||||
def cmd_ACCEPT(self, gcmd):
|
||||
self.finalize(True)
|
||||
|
||||
cmd_TARE_help = "Tare the load cell"
|
||||
def cmd_TARE(self, gcmd):
|
||||
self._tare_counts = self.load_cell.avg_counts()
|
||||
self._counts_per_gram = None # require re-calibration on tare
|
||||
self.tare_percent = self.load_cell.counts_to_percent(self._tare_counts)
|
||||
gcmd.respond_info("Load cell tare value: %.2f%% (%i)"
|
||||
% (self.tare_percent, self._tare_counts))
|
||||
if self.tare_percent > 2.:
|
||||
gcmd.respond_info(
|
||||
"WARNING: tare value is more than 2% away from 0!\n"
|
||||
"The load cell's range will be impacted.\n"
|
||||
"Check for external force on the load cell.")
|
||||
gcmd.respond_info("Now apply a known force to the load cell and enter \
|
||||
the force value with:\n CALIBRATE GRAMS=nnn")
|
||||
|
||||
cmd_CALIBRATE_help = "Enter the load cell value in grams"
|
||||
def cmd_CALIBRATE(self, gcmd):
|
||||
if self._tare_counts is None:
|
||||
gcmd.respond_info("You must use TARE first.")
|
||||
return
|
||||
grams = gcmd.get_float("GRAMS", minval=50., maxval=25000.)
|
||||
cal_counts = self.load_cell.avg_counts()
|
||||
cal_percent = self.load_cell.counts_to_percent(cal_counts)
|
||||
c_per_g = self.counts_per_gram(grams, cal_counts)
|
||||
cap_kg = self.capacity_kg(c_per_g)
|
||||
gcmd.respond_info("Calibration value: %.2f%% (%i), Counts/gram: %.5f, \
|
||||
Total capacity: +/- %0.2fKg"
|
||||
% (cal_percent, cal_counts, c_per_g, cap_kg))
|
||||
range_min, range_max = self.load_cell.saturation_range()
|
||||
if cal_counts >= range_max or cal_counts <= range_min:
|
||||
raise self.printer.command_error(
|
||||
"ERROR: Sensor is saturated with too much load!\n"
|
||||
"Use less force to calibrate the load cell.")
|
||||
if cal_counts == self._tare_counts:
|
||||
raise self.printer.command_error(
|
||||
"ERROR: Tare and Calibration readings are the same!\n"
|
||||
"Check wiring and validate sensor with READ_LOAD_CELL command.")
|
||||
if (abs(cal_percent - self.tare_percent)) < 1.:
|
||||
raise self.printer.command_error(
|
||||
"ERROR: Tare and Calibration readings are less than 1% "
|
||||
"different!\n"
|
||||
"Use more force when calibrating or a higher sensor gain.")
|
||||
# only set _counts_per_gram after all errors are raised
|
||||
self._counts_per_gram = c_per_g
|
||||
if cap_kg < 1.:
|
||||
gcmd.respond_info("WARNING: Load cell capacity is less than 1kg!\n"
|
||||
"Check wiring and consider using a lower sensor gain.")
|
||||
if cap_kg > 25.:
|
||||
gcmd.respond_info("WARNING: Load cell capacity is more than 25Kg!\n"
|
||||
"Check wiring and consider using a higher sensor gain.")
|
||||
gcmd.respond_info("Accept calibration with the ACCEPT command.")
|
||||
|
||||
|
||||
# Utility to collect some samples from the LoadCell for later analysis
|
||||
# Optionally blocks execution while collecting with reactor.pause()
|
||||
# can collect a minimum n samples or collect until a specific print_time
|
||||
# samples returned in [[time],[force],[counts]] arrays for easy processing
|
||||
RETRY_DELAY = 0.05 # 20Hz
|
||||
class LoadCellSampleCollector:
|
||||
def __init__(self, printer, load_cell):
|
||||
self._printer = printer
|
||||
self._load_cell = load_cell
|
||||
self._reactor = printer.get_reactor()
|
||||
self._mcu = load_cell.sensor.get_mcu()
|
||||
self.min_time = 0.
|
||||
self.max_time = float("inf")
|
||||
self.min_count = float("inf") # In Python 3.5 math.inf is better
|
||||
self.is_started = False
|
||||
self._samples = []
|
||||
self._errors = 0
|
||||
self._overflows = 0
|
||||
|
||||
def _on_samples(self, msg):
|
||||
if not self.is_started:
|
||||
return False # already stopped, ignore
|
||||
self._errors += msg['errors']
|
||||
self._overflows += msg['overflows']
|
||||
samples = msg['data']
|
||||
for sample in samples:
|
||||
time = sample[0]
|
||||
if self.min_time <= time <= self.max_time:
|
||||
self._samples.append(sample)
|
||||
if time > self.max_time:
|
||||
self.is_started = False
|
||||
if len(self._samples) >= self.min_count:
|
||||
self.is_started = False
|
||||
return self.is_started
|
||||
|
||||
def _finish_collecting(self):
|
||||
self.is_started = False
|
||||
self.min_time = 0.
|
||||
self.max_time = float("inf")
|
||||
self.min_count = float("inf") # In Python 3.5 math.inf is better
|
||||
samples = self._samples
|
||||
self._samples = []
|
||||
errors = self._errors
|
||||
self._errors = 0
|
||||
overflows = self._overflows
|
||||
self._overflows = 0
|
||||
return samples, (errors, overflows) if errors or overflows else 0
|
||||
|
||||
def _collect_until(self, timeout):
|
||||
self.start_collecting()
|
||||
while self.is_started:
|
||||
now = self._reactor.monotonic()
|
||||
if self._mcu.estimated_print_time(now) > timeout:
|
||||
self._finish_collecting()
|
||||
raise self._printer.command_error(
|
||||
"LoadCellSampleCollector timed out! Errors: %i,"
|
||||
" Overflows: %i" % (self._errors, self._overflows))
|
||||
self._reactor.pause(now + RETRY_DELAY)
|
||||
return self._finish_collecting()
|
||||
|
||||
# start collecting with no automatic end to collection
|
||||
def start_collecting(self, min_time=None):
|
||||
if self.is_started:
|
||||
return
|
||||
self.min_time = min_time if min_time is not None else self.min_time
|
||||
self.is_started = True
|
||||
self._load_cell.add_client(self._on_samples)
|
||||
|
||||
# stop collecting immediately and return results
|
||||
def stop_collecting(self):
|
||||
return self._finish_collecting()
|
||||
|
||||
# block execution until at least min_count samples are collected
|
||||
# will return all samples collected, not just up to min_count
|
||||
def collect_min(self, min_count=1):
|
||||
self.min_count = min_count
|
||||
if len(self._samples) >= min_count:
|
||||
return self._finish_collecting()
|
||||
print_time = self._mcu.estimated_print_time(self._reactor.monotonic())
|
||||
start_time = max(print_time, self.min_time)
|
||||
sps = self._load_cell.sensor.get_samples_per_second()
|
||||
return self._collect_until(start_time + 1. + (min_count / sps))
|
||||
|
||||
# returns when a sample is collected with a timestamp after print_time
|
||||
def collect_until(self, print_time=None):
|
||||
self.max_time = print_time
|
||||
if len(self._samples) and self._samples[-1][0] >= print_time:
|
||||
return self._finish_collecting()
|
||||
return self._collect_until(self.max_time + 1.)
|
||||
|
||||
# Printer class that controls the load cell
|
||||
MIN_COUNTS_PER_GRAM = 1.
|
||||
class LoadCell:
|
||||
def __init__(self, config, sensor):
|
||||
self.printer = printer = config.get_printer()
|
||||
self.sensor = sensor # must implement BulkAdcSensor
|
||||
self.config_name = config.get_name()
|
||||
self.name = config.get_name().split()[-1]
|
||||
self.sensor = sensor # must implement BulkSensorAdc
|
||||
buffer_size = sensor.get_samples_per_second() // 2
|
||||
self._force_buffer = collections.deque(maxlen=buffer_size)
|
||||
self.reference_tare_counts = config.getint('reference_tare_counts',
|
||||
default=None)
|
||||
self.tare_counts = self.reference_tare_counts
|
||||
self.counts_per_gram = config.getfloat('counts_per_gram',
|
||||
minval=MIN_COUNTS_PER_GRAM, default=None)
|
||||
self.invert = config.getchoice('sensor_orientation',
|
||||
{'normal': 1., 'inverted': -1.}, default="normal")
|
||||
LoadCellCommandHelper(config, self)
|
||||
# Client support:
|
||||
self.clients = ApiClientHelper(printer)
|
||||
header = {"header": ["time", "force (g)", "counts", "tare_counts"]}
|
||||
self.clients.add_mux_endpoint("load_cell/dump_force",
|
||||
"load_cell", self.name, header)
|
||||
# startup, when klippy is ready, start capturing data
|
||||
printer.register_event_handler("klippy:ready", self._handle_ready)
|
||||
|
||||
def _on_sample(self, msg):
|
||||
def _handle_ready(self):
|
||||
self.sensor.add_client(self._sensor_data_event)
|
||||
self.add_client(self._track_force)
|
||||
# announce calibration status on ready
|
||||
if self.is_calibrated():
|
||||
self.printer.send_event("load_cell:calibrate", self)
|
||||
if self.is_tared():
|
||||
self.printer.send_event("load_cell:tare", self)
|
||||
|
||||
# convert raw counts to grams and broadcast to clients
|
||||
def _sensor_data_event(self, msg):
|
||||
data = msg.get("data")
|
||||
errors = msg.get("errors")
|
||||
overflows = msg.get("overflows")
|
||||
if data is None:
|
||||
return None
|
||||
samples = []
|
||||
for row in data:
|
||||
# [time, grams, counts, tare_counts]
|
||||
samples.append([row[0], self.counts_to_grams(row[1]), row[1],
|
||||
self.tare_counts])
|
||||
msg = {'data': samples, 'errors': errors, 'overflows': overflows}
|
||||
self.clients.send(msg)
|
||||
return True
|
||||
|
||||
# get internal events of force data
|
||||
def add_client(self, callback):
|
||||
self.clients.add_client(callback)
|
||||
|
||||
def tare(self, tare_counts):
|
||||
self.tare_counts = int(tare_counts)
|
||||
self.printer.send_event("load_cell:tare", self)
|
||||
|
||||
def set_calibration(self, counts_per_gram, tare_counts):
|
||||
if (counts_per_gram is None
|
||||
or abs(counts_per_gram) < MIN_COUNTS_PER_GRAM):
|
||||
raise self.printer.command_error("Invalid counts per gram value")
|
||||
if tare_counts is None:
|
||||
raise self.printer.command_error("Missing tare counts")
|
||||
self.counts_per_gram = counts_per_gram
|
||||
self.reference_tare_counts = int(tare_counts)
|
||||
configfile = self.printer.lookup_object('configfile')
|
||||
configfile.set(self.config_name, 'counts_per_gram',
|
||||
"%.5f" % (self.counts_per_gram,))
|
||||
configfile.set(self.config_name, 'reference_tare_counts',
|
||||
"%i" % (self.reference_tare_counts,))
|
||||
self.printer.send_event("load_cell:calibrate", self)
|
||||
|
||||
def counts_to_grams(self, sample):
|
||||
if not self.is_calibrated() or not self.is_tared():
|
||||
return None
|
||||
sample_delta = float(sample - self.tare_counts)
|
||||
return self.invert * (sample_delta / self.counts_per_gram)
|
||||
|
||||
# The maximum range of the sensor based on its bit width
|
||||
def saturation_range(self):
|
||||
return self.sensor.get_range()
|
||||
|
||||
# convert raw counts to a +/- percentage of the sensors range
|
||||
def counts_to_percent(self, counts):
|
||||
range_min, range_max = self.saturation_range()
|
||||
return (float(counts) / float(range_max)) * 100.
|
||||
|
||||
# read 1 second of load cell data and average it
|
||||
# performs safety checks for saturation
|
||||
def avg_counts(self, num_samples=None):
|
||||
if num_samples is None:
|
||||
num_samples = self.sensor.get_samples_per_second()
|
||||
samples, errors = self.get_collector().collect_min(num_samples)
|
||||
if errors:
|
||||
raise self.printer.command_error(
|
||||
"Sensor reported %i errors while sampling"
|
||||
% (errors[0] + errors[1]))
|
||||
# check samples for saturated readings
|
||||
range_min, range_max = self.saturation_range()
|
||||
for sample in samples:
|
||||
if sample[2] >= range_max or sample[2] <= range_min:
|
||||
raise self.printer.command_error(
|
||||
"Some samples are saturated (+/-100%)")
|
||||
return avg(select_column(samples, 2))
|
||||
|
||||
# Provide ongoing force tracking/averaging for status updates
|
||||
def _track_force(self, msg):
|
||||
if not (self.is_calibrated() and self.is_tared()):
|
||||
return True
|
||||
samples = msg['data']
|
||||
# selectColumn unusable here because Python 2 lacks deque.extend
|
||||
for sample in samples:
|
||||
self._force_buffer.append(sample[1])
|
||||
return True
|
||||
|
||||
def _force_g(self):
|
||||
if (self.is_calibrated() and self.is_tared()
|
||||
and len(self._force_buffer) > 0):
|
||||
return {"force_g": round(avg(self._force_buffer), 1),
|
||||
"min_force_g": round(min(self._force_buffer), 1),
|
||||
"max_force_g": round(max(self._force_buffer), 1)}
|
||||
return {}
|
||||
|
||||
def is_tared(self):
|
||||
return self.tare_counts is not None
|
||||
|
||||
def is_calibrated(self):
|
||||
return (self.counts_per_gram is not None
|
||||
and self.reference_tare_counts is not None)
|
||||
|
||||
def get_sensor(self):
|
||||
return self.sensor
|
||||
|
||||
def get_reference_tare_counts(self):
|
||||
return self.reference_tare_counts
|
||||
|
||||
def get_tare_counts(self):
|
||||
return self.tare_counts
|
||||
|
||||
def get_counts_per_gram(self):
|
||||
return self.counts_per_gram
|
||||
|
||||
def get_collector(self):
|
||||
return LoadCellSampleCollector(self.printer, self)
|
||||
|
||||
def get_status(self, eventtime):
|
||||
status = self._force_g()
|
||||
status.update({'is_calibrated': self.is_calibrated(),
|
||||
'counts_per_gram': self.counts_per_gram,
|
||||
'reference_tare_counts': self.reference_tare_counts,
|
||||
'tare_counts': self.tare_counts})
|
||||
return status
|
||||
|
||||
|
||||
def load_config(config):
|
||||
# Sensor types
|
||||
sensors = {}
|
||||
|
Loading…
x
Reference in New Issue
Block a user