mirror of https://github.com/Desuuuu/klipper.git
data_logger: Flush the initial index file when all queries complete
This should avoid zero byte index files for small captures. Signed-off-by: Kevin O'Connor <kevin@koconnor.net>
This commit is contained in:
parent
8e1929649f
commit
7aa2c11b3a
|
@ -63,19 +63,14 @@ class DataLogger:
|
||||||
self.logger = LogWriter(log_prefix + ".json.gz")
|
self.logger = LogWriter(log_prefix + ".json.gz")
|
||||||
self.index = LogWriter(log_prefix + ".index.gz")
|
self.index = LogWriter(log_prefix + ".index.gz")
|
||||||
# Handlers
|
# Handlers
|
||||||
self.query_handlers = {
|
self.query_handlers = {}
|
||||||
"info": self.handle_info, "list": self.handle_list,
|
self.async_handlers = {}
|
||||||
"status": self.handle_subscribe,
|
|
||||||
}
|
|
||||||
self.async_handlers = {
|
|
||||||
"status": self.handle_async_db,
|
|
||||||
}
|
|
||||||
# get_status databasing
|
# get_status databasing
|
||||||
self.db = {"status": {}}
|
self.db = {}
|
||||||
self.next_index_time = 0.
|
self.next_index_time = 0.
|
||||||
# Start login process
|
# Start login process
|
||||||
self.send_msg({"id": "info", "method": "info",
|
self.send_query("info", "info", {"client_info": ClientInfo},
|
||||||
"params": { "client_info": ClientInfo }})
|
self.handle_info)
|
||||||
def error(self, msg):
|
def error(self, msg):
|
||||||
sys.stderr.write(msg + "\n")
|
sys.stderr.write(msg + "\n")
|
||||||
def finish(self, msg):
|
def finish(self, msg):
|
||||||
|
@ -84,7 +79,9 @@ class DataLogger:
|
||||||
self.index.close()
|
self.index.close()
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
# Unix Domain Socket IO
|
# Unix Domain Socket IO
|
||||||
def send_msg(self, msg):
|
def send_query(self, msg_id, method, params, cb):
|
||||||
|
self.query_handlers[msg_id] = cb
|
||||||
|
msg = {"id": msg_id, "method": method, "params": params}
|
||||||
cm = json.dumps(msg, separators=(',', ':')).encode()
|
cm = json.dumps(msg, separators=(',', ':')).encode()
|
||||||
self.webhook_socket.send(cm + b"\x03")
|
self.webhook_socket.send(cm + b"\x03")
|
||||||
def process_socket(self):
|
def process_socket(self):
|
||||||
|
@ -110,7 +107,10 @@ class DataLogger:
|
||||||
msg_id = msg.get("id")
|
msg_id = msg.get("id")
|
||||||
hdl = self.query_handlers.get(msg_id)
|
hdl = self.query_handlers.get(msg_id)
|
||||||
if hdl is not None:
|
if hdl is not None:
|
||||||
|
del self.query_handlers[msg_id]
|
||||||
hdl(msg, part)
|
hdl(msg, part)
|
||||||
|
if not self.query_handlers:
|
||||||
|
self.flush_index()
|
||||||
continue
|
continue
|
||||||
self.error("ERROR: Message with unknown id")
|
self.error("ERROR: Message with unknown id")
|
||||||
def run(self):
|
def run(self):
|
||||||
|
@ -123,52 +123,49 @@ class DataLogger:
|
||||||
except KeyboardInterrupt as e:
|
except KeyboardInterrupt as e:
|
||||||
self.finish("Keyboard Interrupt")
|
self.finish("Keyboard Interrupt")
|
||||||
# Query response handlers
|
# Query response handlers
|
||||||
|
def send_subscribe(self, msg_id, method, params, cb=None, async_cb=None):
|
||||||
|
if cb is None:
|
||||||
|
cb = self.handle_dump
|
||||||
|
if async_cb is not None:
|
||||||
|
self.async_handlers[msg_id] = async_cb
|
||||||
|
params["response_template"] = {"q": msg_id}
|
||||||
|
self.send_query(msg_id, method, params, cb)
|
||||||
def handle_info(self, msg, raw_msg):
|
def handle_info(self, msg, raw_msg):
|
||||||
if msg["result"]["state"] != "ready":
|
if msg["result"]["state"] != "ready":
|
||||||
self.finish("Klipper not in ready state")
|
self.finish("Klipper not in ready state")
|
||||||
self.send_msg({"id": "list", "method": "objects/list"})
|
self.send_query("list", "objects/list", {}, self.handle_list)
|
||||||
def handle_list(self, msg, raw_msg):
|
def handle_list(self, msg, raw_msg):
|
||||||
subreq = {o: None for o in msg["result"]["objects"]}
|
subreq = {o: None for o in msg["result"]["objects"]}
|
||||||
self.send_msg({"id": "status", "method": "objects/subscribe",
|
self.send_subscribe("status", "objects/subscribe", {"objects": subreq},
|
||||||
"params": { "objects": subreq,
|
self.handle_subscribe, self.handle_async_db)
|
||||||
"response_template": {"q": "status"}}})
|
|
||||||
def handle_subscribe(self, msg, raw_msg):
|
def handle_subscribe(self, msg, raw_msg):
|
||||||
result = msg["result"]
|
result = msg["result"]
|
||||||
self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME
|
self.next_index_time = result["eventtime"] + INDEX_UPDATE_TIME
|
||||||
status = result["status"]
|
self.db["status"] = status = result["status"]
|
||||||
self.db["status"].update(status)
|
# Subscribe to trapq and stepper queue updates
|
||||||
motion_report = status.get("motion_report")
|
motion_report = status.get("motion_report", {})
|
||||||
if motion_report is not None:
|
|
||||||
for trapq in motion_report.get("trapq", []):
|
for trapq in motion_report.get("trapq", []):
|
||||||
qname = "trapq:" + trapq
|
self.send_subscribe("trapq:" + trapq, "motion_report/dump_trapq",
|
||||||
self.query_handlers[qname] = self.handle_dump
|
{"name": trapq})
|
||||||
self.send_msg({"id": qname,
|
|
||||||
"method": "motion_report/dump_trapq",
|
|
||||||
"params": { "name": trapq,
|
|
||||||
"response_template": {"q": qname}}})
|
|
||||||
for stepper in motion_report.get("steppers", []):
|
for stepper in motion_report.get("steppers", []):
|
||||||
qname = "stepq:" + stepper
|
self.send_subscribe("stepq:" + stepper,
|
||||||
self.query_handlers[qname] = self.handle_dump
|
"motion_report/dump_stepper", {"name": stepper})
|
||||||
self.send_msg({"id": qname,
|
|
||||||
"method": "motion_report/dump_stepper",
|
|
||||||
"params": { "name": stepper,
|
|
||||||
"response_template": {"q": qname}}})
|
|
||||||
def handle_dump(self, msg, raw_msg):
|
def handle_dump(self, msg, raw_msg):
|
||||||
msg_id = msg["id"]
|
msg_id = msg["id"]
|
||||||
self.db.setdefault("subscriptions", {})[msg_id] = msg["result"]
|
self.db.setdefault("subscriptions", {})[msg_id] = msg["result"]
|
||||||
|
def flush_index(self):
|
||||||
|
self.db['file_position'] = self.logger.flush()
|
||||||
|
self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode())
|
||||||
|
self.db = {"status": {}}
|
||||||
def handle_async_db(self, msg, raw_msg):
|
def handle_async_db(self, msg, raw_msg):
|
||||||
params = msg["params"]
|
params = msg["params"]
|
||||||
db_status = self.db['status']
|
db_status = self.db['status']
|
||||||
for k, v in params.get("status", {}).items():
|
for k, v in params.get("status", {}).items():
|
||||||
db_status.setdefault(k, {}).update(v)
|
db_status.setdefault(k, {}).update(v)
|
||||||
eventtime = params['eventtime']
|
eventtime = params['eventtime']
|
||||||
if eventtime < self.next_index_time:
|
if eventtime >= self.next_index_time:
|
||||||
return
|
|
||||||
# Update index file
|
|
||||||
self.next_index_time = eventtime + INDEX_UPDATE_TIME
|
self.next_index_time = eventtime + INDEX_UPDATE_TIME
|
||||||
self.db['file_position'] = self.logger.flush()
|
self.flush_index()
|
||||||
self.index.add_data(json.dumps(self.db, separators=(',', ':')).encode())
|
|
||||||
self.db = {"status": {}}
|
|
||||||
|
|
||||||
def nice():
|
def nice():
|
||||||
try:
|
try:
|
||||||
|
|
Loading…
Reference in New Issue