188 lines
6.3 KiB
Python
188 lines
6.3 KiB
Python
from .checksum import checksum
|
|
from enum import Enum
|
|
|
|
|
|
class ReaderCommand(object):
|
|
|
|
def __init__(self, cmd, addr=0xFF, data=[]):
|
|
self.addr = addr
|
|
self.cmd = cmd
|
|
self.data = data
|
|
|
|
def serialize(self):
|
|
frame_length = 4 + len(self.data)
|
|
base_data = bytearray([ frame_length, self.addr, self.cmd ]) + bytearray(self.data)
|
|
base_data.extend(bytearray(self.checksum_bytes(base_data)))
|
|
return base_data
|
|
|
|
def checksum_bytes(self, data_bytes):
|
|
crc = checksum(data_bytes)
|
|
crc_msb = crc >> 8
|
|
crc_lsb = crc & 0xFF
|
|
return bytearray([ crc_lsb, crc_msb ])
|
|
|
|
|
|
class CommandRunner(object):
|
|
|
|
def __init__(self, transport):
|
|
self.transport = transport
|
|
|
|
def run(self, command):
|
|
self.transport.write(command.serialize())
|
|
return self.transport.read_frame()
|
|
|
|
|
|
class ReaderResponseFrame(object):
|
|
|
|
def __init__(self, resp_bytes, offset=0):
|
|
if len(resp_bytes) < 5:
|
|
raise ValueError('Response must be at least 5 bytes')
|
|
self.len = resp_bytes[offset]
|
|
if self.len + offset > len(resp_bytes) - 1:
|
|
raise ValueError('Response does not contain enough bytes for frame (expected %d bytes after offset %d, actual length %d)' % (self.len, offset, len(resp_bytes)))
|
|
self.reader_addr = resp_bytes[offset+1]
|
|
self.resp_cmd = resp_bytes[offset+2]
|
|
self.result_status = resp_bytes[offset+3]
|
|
self.data = resp_bytes[offset+4:offset+self.len-1]
|
|
cs_status = self.verify_checksum(resp_bytes[offset:offset+self.len-1], resp_bytes[offset+self.len-1:offset+self.len+1])
|
|
if cs_status is not True:
|
|
raise(ValueError('Checksum does not match'))
|
|
|
|
def verify_checksum(self, data_bytes, checksum_bytes):
|
|
data_crc = checksum(bytearray(data_bytes))
|
|
crc_msb = data_crc >> 8
|
|
crc_lsb = data_crc & 0xFF
|
|
return checksum_bytes[0] == crc_lsb and checksum_bytes[1] == crc_msb
|
|
|
|
def __len__(self):
|
|
return self.len
|
|
|
|
def get_data(self):
|
|
return self.data
|
|
|
|
|
|
class ReaderFrequencyBand(Enum):
|
|
|
|
China2 = 0b0001
|
|
US = 0b0010
|
|
Korea = 0b0011
|
|
EU = 0b0100
|
|
Ukraine = 0b0110
|
|
Peru = 0b0111
|
|
China1 = 0b1000
|
|
EU3 = 0b1001
|
|
Taiwan = 0b1010
|
|
US3 = 0b1100
|
|
|
|
|
|
class ReaderType(Enum):
|
|
|
|
RRU9803M = 0x03 # CF-RU5102 (desktop USB reader/writer, as specified)
|
|
RRU9803M_1 = 0x08 # CF-RU5102 (desktop USB reader/writer, actual)
|
|
UHFReader18 = 0x09
|
|
UHFReader288M = 0x0c
|
|
UHFReader86 = 0x0f # CF-MU903/CF-MU904 (as documented)
|
|
UHFReader86_1 = 0x10 # CF-MU903/CF-MU904 (actual)
|
|
RRU9883M = 0x16 # CF-MU902
|
|
UHFReader288MP = 0x20 # CF-MU804
|
|
|
|
|
|
class ReaderInfoFrame(ReaderResponseFrame):
|
|
|
|
def __init__(self, resp_bytes):
|
|
super(ReaderInfoFrame, self).__init__(resp_bytes)
|
|
if len(self.data) >= 8:
|
|
self.firmware_version = self.data[0:2]
|
|
self.type = ReaderType(self.data[2])
|
|
self.supports_6b = (self.data[3] & 0b01) > 0
|
|
self.supports_6c = (self.data[3] & 0b10) > 0
|
|
dmaxfre = self.data[4]
|
|
dminfre = self.data[5]
|
|
self.max_frequency = dmaxfre & 0b00111111
|
|
self.min_frequency = dminfre & 0b00111111
|
|
self.frequency_band = ReaderFrequencyBand(((dmaxfre & 0b11000000 ) >> 4) + ((dminfre & 0b11000000 ) >> 6))
|
|
self.power = self.data[6]
|
|
self.scan_time = self.data[7]
|
|
else:
|
|
raise ValueError('Data must be at least 8 characters')
|
|
|
|
def get_regional_frequency(self, fnum):
|
|
if self.frequency_band is ReaderFrequencyBand.EU:
|
|
return 865.1 + fnum * 0.2
|
|
elif self.frequency_band is ReaderFrequencyBand.China2:
|
|
return 920.125 + fnum * 0.25
|
|
elif self.frequency_band is ReaderFrequencyBand.US:
|
|
return 902.75 + fnum * 0.5
|
|
elif self.frequency_band is ReaderFrequencyBand.Korea:
|
|
return 917.1 + fnum * 0.2
|
|
elif self.frequency_band is ReaderFrequencyBand.Ukraine:
|
|
return 868.0 + fnum * 0.1
|
|
elif self.frequency_band is ReaderFrequencyBand.Peru:
|
|
return 916.2 + fnum * 0.9
|
|
elif self.frequency_band is ReaderFrequencyBand.China1:
|
|
return 840.125 + fnum * 0.25
|
|
elif self.frequency_band is ReaderFrequencyBand.EU3:
|
|
return 865.7 + fnum * 0.6
|
|
elif self.frequency_band is ReaderFrequencyBand.US3:
|
|
return 902 + fnum * 0.5
|
|
elif self.frequency_band is ReaderFrequencyBand.Taiwan:
|
|
return 922.25 + fnum * 0.5
|
|
else:
|
|
return fnum
|
|
|
|
def get_min_frequency(self):
|
|
return self.get_regional_frequency(self.min_frequency)
|
|
|
|
def get_max_frequency(self):
|
|
return self.get_regional_frequency(self.max_frequency)
|
|
|
|
|
|
class G2InventoryResponse(object):
|
|
|
|
frame_class = None
|
|
|
|
def __init__(self, resp_bytes):
|
|
self.resp_bytes = resp_bytes
|
|
|
|
def get_frame(self):
|
|
offset = 0
|
|
while offset < len(self.resp_bytes):
|
|
next_frame = self.frame_class(self.resp_bytes, offset=offset)
|
|
offset += len(next_frame) + 1 # For a frame with stated length N there are N+1 bytes
|
|
yield next_frame
|
|
|
|
def get_tag(self):
|
|
for response_frame in self.get_frame():
|
|
for tag in response_frame.get_tag():
|
|
yield tag
|
|
|
|
|
|
class TagData(object):
|
|
|
|
def __init__(self, resp_bytes, prefix_bytes=0, suffix_bytes=0):
|
|
self.prefix_bytes = prefix_bytes
|
|
self.suffix_bytes = suffix_bytes
|
|
self.data = resp_bytes
|
|
self.num_tags = resp_bytes[0]
|
|
|
|
def get_tag_data(self):
|
|
n = 0
|
|
pointer = 1
|
|
while n < self.num_tags:
|
|
tag_len = int(self.data[pointer])
|
|
tag_data_start = pointer + 1
|
|
tag_main_start = tag_data_start + self.prefix_bytes
|
|
tag_main_end = tag_main_start + tag_len
|
|
next_tag_start = tag_main_end + self.suffix_bytes
|
|
yield (self.data[tag_data_start:tag_main_start], self.data[tag_main_start:tag_main_end], self.data[tag_main_end:next_tag_start])
|
|
pointer = next_tag_start
|
|
n += 1
|
|
|
|
|
|
class Tag(object):
|
|
|
|
def __init__(self, epc, antenna_num=1, rssi=None):
|
|
self.epc = epc
|
|
self.antenna_num = antenna_num
|
|
self.rssi = rssi
|