SPV Enable:
Python HDL Verification
Python as other language have advantages and disadvantages.
New: Using python as a verification environment.
We add in SPV a package that support using python as a verification environment. A special python package supplied with interface to HDL include the following python classes:
-
Signal access READ/WRITE.
-
Create HDL event and enable Wait(HdlEvent),
-
Scheduler that enable:
-
create python thread (TCM).
-
Manage all thread in simulation time.
-
-
Global method that enable:
-
Advance time.
-
pass to simulator prompt and return.
-
Print or use time as a variable.
-
-
All python capabilities free for any using in the verification environment.
Example:
In this example we drive packet to design, while the Monitor/Checker check correction. There are 4 interfaces each Driver/Monitor instance take care of 1 interface.
The following function display:
-
Driver function -- Drive NumPacket that configured by Python random
-
Monitor function -- Monitor data and check correction, mutual score board fill by driver, while monitor collect data and make compare.
-
Generator - Generate interface and NumPacket.
-
Test - Launch Monitor and Driver for NumPacket driving and checking.
Driver code
# Driver code drive 'NumFrame=X' packets
# import scheduler and HDL python interface
from SpvPython import *
# import python signal interface to the HDL itself
from AllSig import *
# import random to generate numPackeet .
import random
# import current Generator that generate the HDL interface.
from Generator import *
def DriveProc(id: int):
newVal = 0;#value to set to m_SigData
#There are 4 interface get current HDL interface
MiiInf = MiiInterface(id);
# Create event
clkPos = SPSigEvent(MiiInf.SigClk,AtPos);
#example how to declare signal access local.
SigData= SPSig(TOP_0 + "miiD" + str(id));
MiiInf.SigEn.Set(0);#Set data to HDL
StrPrefix = "Driver" + str(id) + ":";
#Loop that send NumFrame packets
for FrameIndex in range (0,NumFrame):
GlobSched.WaitEv(clkPos); #Wait clk
for SliceInd in range(0,99):
if SliceInd is 0:
MiiInf.SigEn.Set(1);
#Set enable when start new frame
else
MiiInf.SigErr.Set(0);
#drive random data using pythom random
newVal=(int(random.random()*0x7FFFFFFF))%256;
SigData.Set(newVal);
#Prepare the expected formonitor MiiInf.listData.append(newVal+6); GlobSched.WaitEv(clkPos);
#before new loop set enable '0'
MiiInf.SigEn.Set(0);
#wait 1-10 clocks
numWait=(int(random.random()*0x7FFFFFFF)%10) +4;
#Wait some clocks before next packet
for i in range (0,numWait): GlobSched.WaitEv(clkPos);
Monitor code
# Monitor code drive 'NumFrame=X' packets
# import scheduler and HDL python interface
from SpvPython import *
# import python signal interface to the HDL itself
from AllSig import *
# import random to generate numPackeet .
import random
# import current Generator that generate the HDL interface.
from Generator import *
# import Entry SPV which is the SPV interface to python.
import SPV
def MonitorProc(id: int):
getVal = 0;
PacketInd = 0;
#Get interface [0..3] with the score board pointer
MiiInf = MiiInterface(id);
# create event on clock posedge
clkPos =SPSigEvent(MiiInf.SigClk,AtPos);
SigData =SPSig(TOP_0+"miiD"+str(id)); #example how to declare local signal interface
DataValidPos=SPSigEvent(MiiInf.SigEn,AtPos)
StrPrefix = "Monitor" + str(id) + ":";
while True:
SliceInd = 0;
if MiiInf.SigEn.Val64() is 0:
GlobSched.WaitEv(DataValidPos);
#wait 1 clock any way
GlobSched.WaitEv(clkPos);
while MiiInf.SigEn.Val64() is 1:
Time = "T: " + str(SPV.SimTime());
getVal = SigData.Val64();
if (len(MiiInf.listData) >0):
if (MiiInf.listData[0] != getVal):
print(StrPrefix," Error** in pkt ",PacketInd,"derive:",MiiInf.listData[0]," Got ",getVal,Time);
else:
print (StrPrefix, " OK in packet ",PacketInd,"Slice ",SliceInd," is ",getVal,"T: ",Time);
#throw frame after finish check
MiiInf.listData.pop(0);
GlobSched.WaitEv(clkPos);
SliceInd = SliceInd + 1;
PacketInd= PacketInd +1;
if PacketInd >= NumFrame:
GlobSched.WaitEv(clkPos);
return;
Generator code
# Generator code , generate the interface and score board
import random
from AllSig import *
# score board for interface 0..3
listOfData0 = [];
listOfData1 = [];
listOfData2 = [];
listOfData3 = [];
NumFrame = 3; # default
def GenNumFrame(max: int):
NumFrame=(int(random.random()*0x7FFFFFFF)% max)+1;
class MiiInterface():
def __init__(self,miiInd: int):
self.strId=str(miiInd);
#initial clk[0..3] by dynamic string
self.SigClk=SPSig(TOP_0+"mii_clk"+miiInd);
# enable[0..3]
self.SigEn=SPSig(TOP_0+"mii_en"+miiInd);
#err[0..3]
self.SigErr=SPSig(TOP_0+"mii_err"+miiInd);
exec("self.listData=listOfData%d"%miiInd));
# create string on line
Test Code
# Start Driver and monitor code as a thread, they start immediately.
import Driver
import Monitor
def Test():
#create driver + monitor interface 0
GlobSched.AddThread(Driver.DriveProc,'Dr0',0)
GlobSched.AddThread(Monitor.MonitorProc,'Mon0',0)
# Run SpvPython scheduler, start all the TCM as a HDL thread
GlobSched.Run();
HDL Interface
#This list of signal created automatically by SPV HDL explorer.
#when such line declare, it immediately connected to design
from SpvPython import *;
#Path list
TOP_0 = "topTb." # the full path string
m_Sigclk_in = SPSig( TOP_0 + "clk_in");
m_Sigmain_rst = SPSig( TOP_0 + "main_rst");
m_Sigclk0 = SPSig( TOP_0 + "clk0");
m_SigdataIn0 = SPSig( TOP_0 + "dataIn0");
m_SigdataOut0 = SPSig( TOP_0 + "dataOut0");
m_Sigclk1 = SPSig( TOP_0 + "clk1");
m_SigdataIn1 = SPSig( TOP_0 + "dataIn1");
m_SigdataOut1 = SPSig( TOP_0 + "dataOut1");
m_Sigclk2 = SPSig( TOP_0 + "clk2");
m_SigdataIn2 = SPSig( TOP_0 + "dataIn2");
m_SigdataOut2 = SPSig( TOP_0 + "dataOut2");
m_Sigmii_clk0 = SPSig( TOP_0 + "mii_clk0");
m_SigmiiD0 = SPSig( TOP_0 + "miiD0");
m_Sigmii_en0 = SPSig( TOP_0 + "mii_en0");
m_Sigmii_err0 = SPSig( TOP_0 + "mii_err0");
m_Sigmii_clkPy = SPSig( TOP_0 + "mii_clkPy");
m_SigmiiDPy = SPSig( TOP_0 + "miiDPy");
m_Sigmii_enPy = SPSig( TOP_0 + "mii_enPy");
m_Sigmii_errPy = SPSig( TOP_0 + "mii_errPy");
m_Sigmii_clk1 = SPSig( TOP_0 + "mii_clk1");
Python advantages

Other Scheduler interface supply with product
Python testbench - design from Simulator page (X5)
file AllSig.py
#list of signal indicated in function SpvConfig::CreateSpvHdlInf
#created automatically and could be extend by user.
from SpvPython import *;
#Path list
TOP_0 = "TopCalc."
m_SigData1 = SPSig( TOP_0 + "Data1");
m_SigData2 = SPSig( TOP_0 + "Data2");
m_SigData2Slice = SPSig( TOP_0 + "Data2Slice");
m_Sigadd_a = SPSig( TOP_0 + "add_a");
m_Sigadd_b = SPSig( TOP_0 + "add_b");
m_Sigadd_res = SPSig( TOP_0 + "add_res");
m_Sigand_a = SPSig( TOP_0 + "and_a");
m_Sigand_b = SPSig( TOP_0 + "and_b");
m_Sigand_res = SPSig( TOP_0 + "and_res");
m_Sigassign_clk = SPSig( TOP_0 + "assign_clk");
m_Sigcase_clk = SPSig( TOP_0 + "case_clk");
m_Sigcase_data = SPSig( TOP_0 + "case_data");
m_Sigcase_value = SPSig( TOP_0 + "case_value");
m_Sigclk_add = SPSig( TOP_0 + "clk_add");
m_Sigclk_and = SPSig( TOP_0 + "clk_and");
m_Sigclk_div = SPSig( TOP_0 + "clk_div");
m_Sigclk_factorial = SPSig( TOP_0 + "clk_factorial");
m_Sigclk_mode = SPSig( TOP_0 + "clk_mode");
m_Sigclk_mult = SPSig( TOP_0 + "clk_mult");
m_Sigclk_or = SPSig( TOP_0 + "clk_or");
m_Sigclk_shiftL = SPSig( TOP_0 + "clk_shiftL");
m_Sigclk_shiftR = SPSig( TOP_0 + "clk_shiftR");
m_Sigclk_sub = SPSig( TOP_0 + "clk_sub");
m_Sigclk_xor = SPSig( TOP_0 + "clk_xor");
m_Sigcrc_clock = SPSig( TOP_0 + "crc_clock");
m_Sigcurled_clk = SPSig( TOP_0 + "curled_clk");
m_Sigdiv_a = SPSig( TOP_0 + "div_a");
m_Sigdiv_b = SPSig( TOP_0 + "div_b");
m_Sigdiv_res = SPSig( TOP_0 + "div_res");
m_Sigfifo_count = SPSig( TOP_0 + "fifo_count");
m_Sigfifo_empty = SPSig( TOP_0 + "fifo_empty");
m_Sigfifo_full = SPSig( TOP_0 + "fifo_full");
m_Sigfifo_size = SPSig( TOP_0 + "fifo_size");
m_Sighave_2_write = SPSig( TOP_0 + "have_2_write");
m_Sigmode_a = SPSig( TOP_0 + "mode_a");
m_Sigmode_b = SPSig( TOP_0 + "mode_b");
m_Sigmode_res = SPSig( TOP_0 + "mode_res");
m_Sigmult_a = SPSig( TOP_0 + "mult_a");
m_Sigmult_b = SPSig( TOP_0 + "mult_b");
m_Sigmult_res = SPSig( TOP_0 + "mult_res");
m_Sigor_a = SPSig( TOP_0 + "or_a");
m_Sigor_b = SPSig( TOP_0 + "or_b");
m_Sigor_res = SPSig( TOP_0 + "or_res");
m_SigregA = SPSig( TOP_0 + "regA");
m_SigregB = SPSig( TOP_0 + "regB");
m_SigregC = SPSig( TOP_0 + "regC");
m_SigregD = SPSig( TOP_0 + "regD");
m_SigshiftL_a = SPSig( TOP_0 + "shiftL_a");
m_SigshiftL_b = SPSig( TOP_0 + "shiftL_b");
m_SigshiftL_res = SPSig( TOP_0 + "shiftL_res");
m_SigshiftR_a = SPSig( TOP_0 + "shiftR_a");
m_SigshiftR_b = SPSig( TOP_0 + "shiftR_b");
m_SigshiftR_res = SPSig( TOP_0 + "shiftR_res");
m_Sigsub_a = SPSig( TOP_0 + "sub_a");
m_Sigsub_b = SPSig( TOP_0 + "sub_b");
m_Sigsub_res = SPSig( TOP_0 + "sub_res");
m_Siguse_SpvTb = SPSig( TOP_0 + "use_SpvTb");
m_SigwireA = SPSig( TOP_0 + "wireA");
m_SigwireB = SPSig( TOP_0 + "wireB");
m_SigwireC = SPSig( TOP_0 + "wireC");
m_Sigwire_2_read = SPSig( TOP_0 + "wire_2_read");
m_Sigwire_2_write = SPSig( TOP_0 + "wire_2_write");
m_SigxorData1 = SPSig( TOP_0 + "xorData1");
m_Sigxor_a = SPSig( TOP_0 + "xor_a");
m_Sigxor_b = SPSig( TOP_0 + "xor_b");
m_Sigxor_res = SPSig( TOP_0 + "xor_res");
file SpvPython.py
This file is the python interface to SPV, it enable schedule simulator event, doing Wait([event,time]), declare signal, generator and more. this is the main file that enable using python as a verification environment.
from ctypes import *
from enum import Enum
import sys
import threading
import time
import datetime
import SPV
class SPSig():
def __init__(self, SigFullPath):
self.m_Name = SigFullPath;
self.SigId = SPV.InitSig(SigFullPath);
def Set(self,value):
SPV.Set2Sig(int(value),self.SigId);
def Val64(self):
val = SPV.SigVal64(self.SigId);
return val;
def Val32(self):
val = SPV.SigVal32(self.SigId);
return val;
def Name(self):
return self.m_Name;
#need to implement
class SpvGen():
def __init__(self, GenGuiName):
self.m_Name = GenGuiName;
self.GenId = SPV.GetGuiGen(GenGuiName);
def Gen(self):
val = SPV.Gen(self.GenId);
return genVal;
class SPEvType(Enum):
AtPos = 0
AtNeg = 1
AtChange = 2
AtEqual = 3
def Uint(self):
return self.value
def __str__(self):
return 'Event'.format(self.value)
#Short key for any using.
AtPos = SPEvType.AtPos
AtNeg = SPEvType.AtNeg
AtChange = SPEvType.AtChange
AtEqual = SPEvType.AtEqual
class SPSigEvent():
def __init__(self, sig : SPSig,ev : SPEvType):
self.m_Sig = sig;
self.m_EvType = 1;#ev; SHMUEL need to fix why self.m_EvType = ev is not good.
self.m_IsTime = 0;
self.m_SpvEvId = SPV.InitSigEv(self.m_Sig.SigId,self.m_EvType);#SpvEvId comming from C code (of SPV).
#work around of problem with print (Bacause SPV stole any print to ActiveOut class), use PR that print output to stderr
#which work well
pyLog = open('Python.log', 'w');
def PR(s):
print(str(s),file=sys.stderr);
pyLog.write(str(s) + "\n");
pyLog.flush();
return
############### Python-Spv Scheduler
#these are the field that user should supply for any thread that used as SPV TCM
#Wait(Event) and any other timming ability that comming from SPV.
class ThreadParam():
def __init__(self,DefFunction,name: str,id: int):
self.ThreadFunction = DefFunction;
self.m_ThreadName = name;
self.ThreadId = id;
#this class pass as is to SPV for any wait command. SPV create AtomicProcess CB StartAProc for always.
#when the event occure, SPV mark m_IsEvOccure = 1. the Scheduler function SPSchedulerCenter::Run
#run the event that m_IsEvOccure = 1, if there is again Wait we SPV check if the CB is for the same event.
#if the answer is True, it is just wait to next event occuration, else it kill the prev event and make new one.
#class OneCEvent(Structure):
# _fields_ = [
# ("m_SigInd", c_uint),#the pointer of the signal that the event (Pos,Neg<change) wait for it.
# ("m_EventType", c_uint),#Pos,Neg,change
# ("m_IsTime", c_uint),#if is time it is Wait (time)
# ("m_Delay", c_uint),#the time in case m_IsTime = True
# ("m_IsEvOccure",c_uint),#fill by SPV when the event occure.
# ("m_IsAsPrev", c_uint),#Python fill this, if it is as prev we don't create CB, just use the prev CB.
# ("m_LastAProc", c_ulong)#the pointer of the last process control, if it is new Event we kill the old one, and create new one.
# ]
#when Python (SPV) create thread it create one instance of OneCEventInfo
#it old one CEvent
class OneCEventInfo():
def __init__(self,SpvTimeEvId: int,SpvEvId: int,ThreadBornIn : int,name: str):
#self.m_CEvent = CEvent;
self.m_TSpvEvId = SPV.InitSigEv(999,999);#Create time C event -- 999 will mark that it is time event
self.m_SpvEvId = SpvEvId;
self.m_BornInd = ThreadBornIn;#the SPV index of thread the first is 0.
self.m_ThreadEv = threading.Event();#Event object to use for call wait clear set .....
self.m_Name = name;
class SPSchedulerCenter():
def __init__(self):
self.numThread = 0;#this line and all lines with this variable should be deleted, don't need them.
#self.Events = [];
#self.OneCEventList = [];#all event that are in live (Wait event to occure, or occure now)
#self.EvMng = EventSHolder();#Manager to all event
self.ThreadParamList= [];#list of new thread to init.
self.MainEvent = threading.Event();
self.IsDebug = True;
self.OneCEventInfoLst=[];
self.CurrCEvInfo = None;
#self.TimeCEvInfo = None;
self.ThreadCounter = 0;
self.m_ToStop = False;
self.IsThreadKilled = False;
def WaitEv(self,SPSigEvent):
prevSpvId = self.CurrCEvInfo.m_SpvEvId;
self.CurrCEvInfo.m_SpvEvId = SPSigEvent.m_SpvEvId;#update any way
SPV.AddPyCb(prevSpvId,SPSigEvent.m_SpvEvId);
if self.MainEvent.is_set() is False:
self.MainEvent.set();#release main thread
self.CurrCEvInfo.m_ThreadEv.clear();#because if scheduler release it, it is true I must clear it to be false.
self.CurrCEvInfo.m_ThreadEv.wait();
#if self.IsDebug is True:
#PR(threading.current_thread().name + " Wake up at time " + str(SPV.SimTime()));
return;
def WaitTime(self,timeDelay : int):
#PR("WaitTime : " + str(timeDelay));
#SetTime2EvCb(timeDelay,self.TimeCEvInfo.m_SpvEvId);
prevSpvId = self.CurrCEvInfo.m_SpvEvId;
self.CurrCEvInfo.m_SpvEvId = self.CurrCEvInfo.m_TSpvEvId;#update any way
SPV.AddTimePyCb(prevSpvId,self.CurrCEvInfo.m_SpvEvId,timeDelay);
if self.MainEvent.is_set() is False:
self.MainEvent.set();#release main thread
self.CurrCEvInfo.m_ThreadEv.clear();#because if scheduler release it, it is true I must clear it to be false.
self.CurrCEvInfo.m_ThreadEv.wait();
#if self.IsDebug is True:
#PR(threading.current_thread().name + " Wake up at time " + str(SPV.SimTime()));
return;
def SetDebug(self,isDebug : bool):
self.IsDebug = isDebug;
def DoStop(self):
self.m_ToStop = True;
#SHMUEL: I want to start thread, that will call wait how much time it need.
#when it finish, I must know about because I want to release the main thread. the solution
#is to start any thread with this function, which will call immediatelly to the user function.
#when user function finish it will return to this function, which release the main thread.
def ThreadStartPoint(self,threadParam : ThreadParam):
if self.IsDebug is True:
PR(threadParam.m_ThreadName + " StartUp with id " + str(threadParam.ThreadId));
#create thread info which will be alive all the thread live.
self.CurrCEvInfo = OneCEventInfo(888888888,999999999,self.ThreadCounter,threadParam.m_ThreadName);
self.OneCEventInfoLst.append(self.CurrCEvInfo);
currThreadInd = self.ThreadCounter;
self.ThreadCounter = self.ThreadCounter + 1;
threadParam.ThreadFunction(threadParam.ThreadId);
#when it come to this line, this is the time that thread dead.
#so I want to del the OneCEventInfo from list.
listLen = len(self.OneCEventInfoLst);
indexToPop=0;
for i in range(0,listLen):
if self.OneCEventInfoLst[i].m_BornInd is currThreadInd:
indexToPop = i;
break;
if self.IsDebug is True:
PR("Kill thread " + str(indexToPop) + " " + self.CurrCEvInfo.m_Name + " at time " + str(SPV.SimTime()));
SPV.PyLastCbKill(self.OneCEventInfoLst[indexToPop].m_SpvEvId);
self.OneCEventInfoLst.pop(indexToPop);
self.IsThreadKilled = True;
self.MainEvent.set();
def Run(self):
#run first all the new thread.
self.m_ToStop = False;#to enable continue after error or any user stop
while True:
#if self.IsDebug is True:
# PR("Len of self.ThreadParamList == " + str(len(self.ThreadParamList)));
for ThP in self.ThreadParamList:
#theThread = threading.Thread(name=newThread.m_ThreadName, target=newThread.ThreadFunction);
theThread = threading.Thread(name=ThP.m_ThreadName, target=self.ThreadStartPoint,args=(ThP,));
theThread.start();
self.MainEvent.wait();
self.MainEvent.clear();#for next time set it false.
self.ThreadParamList.clear();
#loop on CB from SPV to run now.
if (len(self.OneCEventInfoLst) == 0):
return;
listCEVLen = len(self.OneCEventInfoLst);
currCEVInd = 0;
#for currCEV in range(0,listCEVLen):
#for ThCEvInfo in self.OneCEventInfoLst:
advance = 1;
while currCEVInd < listCEVLen:
#if self.IsDebug is True:
# PR(" listCEVLen == " + str(listCEVLen) + " at time " + str(SPV.SimTime()));
#if (self.OneCEventInfoLst[currCEVInd].m_CEvent.m_IsEvOccure == 1):
if (SPV.IsEvOccure(self.OneCEventInfoLst[currCEVInd].m_SpvEvId)):
self.CurrCEvInfo = self.OneCEventInfoLst[currCEVInd];#ThCEvInfo;
#if self.IsDebug is True:
# PR(" m_IsEvOccure == True for thread " + self.CurrCEvInfo.m_Name + " at time " + str(SPV.SimTime()));
self.OneCEventInfoLst[currCEVInd].m_ThreadEv.set();
#self.CurrCEvInfo.m_ThreadEv.set();#ThCEvInfo.m_ThreadEv.set();#release running thread.
self.MainEvent.wait();
self.MainEvent.clear();#for next time set it false.
if self.IsThreadKilled is True:
listCEVLen = listCEVLen -1;
self.IsThreadKilled = False;
advance = 0;#because one CEvent deleted from list, so i not advance the counter because the next become the current
PR(" currCEVInd " + str(currCEVInd));
currCEVInd = currCEVInd + advance;
advance = 1;
#if all thread finish, return to prompt.
if (len(self.OneCEventInfoLst) == 0):
return;
if self.m_ToStop is True:
return;
SPV.PSRun();
def AddThread(self,functionPointer,name: str,id : int):
self.ThreadParamList.append(ThreadParam(functionPointer,name,id));
GlobSched = SPSchedulerCenter();#global scheduler.
file Test0.py
example of test, this test import all what need to start running test. it's start all the thread (TCM - enabele Wait([event], [time]).
from SpvPython import *
from AllSig import *
import SPV
from SPV import *
import time
import datetime
import DriveOperand
#from datetime import datetime
import threading
def Test():
#print time with millisecond
#PR(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]);
PR("Start Test");
datetime.datetime.now(datetime.timezone.utc);
#GlobSched.SetDebug(True);
#the Test itself.
SPV.Wait(1);
m_Siguse_SpvTb.Set(1);
GlobSched.AddThread(DriveOperand.DriveAdd,'DriveAdd',0);
GlobSched.AddThread(DriveOperand.DriveSub,'DriveSub',0);
GlobSched.AddThread(DriveOperand.DriveOr,'DriveOr',0);
GlobSched.AddThread(DriveOperand.DriveXor,'DriveXor',0);
GlobSched.AddThread(DriveOperand.DriveMult,'DriveMult',0);
GlobSched.AddThread(DriveOperand.DriveMode,'DriveMode',0);
GlobSched.AddThread(DriveOperand.DriveAnd,'DriveAnd',0);
GlobSched.AddThread(DriveOperand.DriveShiftL,'DriveShiftL',0);
GlobSched.AddThread(DriveOperand.DriveShiftR,'DriveShiftR',0);
GlobSched.AddThread(DriveOperand.DriveCrcData,'DriveCrcData',0);
GlobSched.AddThread(DriveOperand.DriveCurled,'DriveCurled',0);
GlobSched.Run();
#print time with millisecond
#PR(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]);
datetime.datetime.now(datetime.timezone.utc);
file DriveOperand.py
This file includes all TCM (Thread) on per operand in verilog. it's drive and check design each clock. it report error and summary in the end.
#import SPSched
from SpvPython import *
from AllSig import *
import random
from random import randrange #, uniform
from datetime import datetime
from SPV import *
LoopSize = 40001;
#operator Add +
def DriveAdd(id: int):
PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime());
m_AddClkPos = SPSigEvent(m_Sigclk_add,AtPos);
GlobSched.WaitEv(m_AddClkPos);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
#GlobSched.WaitEv(m_AddClkNeg);
GlobSched.WaitTime(40);
if (ActionInd > 0) :
resHdl = (m_Sigadd_res.Val32())&0xFFFFFFFF;
if (res != resHdl):
PR("DriveAdd: Err at ind "+str(ActionInd)+" adda = "+str(adda)+" addb="); PR(str(addb) + " res=" + str(res) + " resHdl=" + str(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
adda = SPV.Gen()%0x6FFFFFFF;
addb = SPV.Gen()%0x6FFFFFFF;
res = (adda + addb)&0xFFFFFFFF;#Expected
m_Sigadd_a.Set(adda);
m_Sigadd_b.Set(addb);
PR("DriveAdd: numErr=" + str(numErr) + " numOk=" + str(numOk));
#### operator SUB -
def DriveSub(id: int):
PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+ str(SPV.SimTime()));
m_SubClkPos = SPSigEvent(m_Sigclk_sub,AtPos);
GlobSched.WaitEv(m_SubClkPos);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
#GlobSched.WaitEv(m_SubClkNeg);
GlobSched.WaitTime(50);
#PR("DriveSub: After wait Time: " + str(SPV.SimTime()));
if (ActionInd > 0) :
resHdl = (m_Sigsub_res.Val32())&0xFFFFFFFF;
if (res != resHdl):
PR("DriveSub: Error at ind "+str(ActionInd)+" suba = "+str(suba)+" subb=");
PR(+ str(subb) + " res=" + str(res) + " resHdl=" + str(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
suba= SPV.Gen()%0x6FFFFFFF;
subb= SPV.Gen()%0x6FFFFFFF;
res = (suba - subb) &0xFFFFFFFF;
m_Sigsub_a.Set(suba);
m_Sigsub_b.Set(subb);
PR("DriveSub: numErr=" + str(numErr) + " numOk=" + str(numOk));
#### operator OR |
def DriveOr(id: int):
PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime()));
m_OrClkNeg = SPSigEvent(m_Sigclk_or,AtNeg);
GlobSched.WaitEv(m_OrClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_OrClkNeg);
if (ActionInd > 0) :
resHdl = (m_Sigor_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveOr: Error at ind " + str(ActionInd) + " ora = " + str(ora) + " orb=") PR(str(orb) + " res=" + str(res) + " resHdl=" + str(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
#Next iteration
ora = SPV.Gen();
orb = SPV.Gen();
res = (ora | orb) &0xFFFFFFFF;
m_Sigor_a.Set(ora);
m_Sigor_b.Set(orb);
PR("DriveOr: numErr=" + str(numErr) + " numOk=" + str(numOk));
#### Operator xor ^
def DriveXor(id: int):
PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime()));
m_XorClkNeg = SPSigEvent(m_Sigclk_xor,AtNeg);
GlobSched.WaitEv(m_XorClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_XorClkNeg);
if (ActionInd > 0):
resHdl = (m_Sigxor_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveXor: Error at ind " + str(ActionInd) + " xora = " + str(xora) + " xorb=" + str(xorb) + " res=" + str(res) + " resHdl=" + str(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
xora = SPV.Gen();
xorb = SPV.Gen();
res = (xora ^ xorb) &0xFFFFFFFF;
m_Sigxor_a.Set(xora);
m_Sigxor_b.Set(xorb);
PR("DriveXor: numErr=" + str(numErr) + " numOk=" + str(numOk));
#### operator Mode %
def DriveMode(id: int):
PR(threading.current_thread().name+" thread id = "+str(id)+" start at T: "+str(SPV.SimTime()));
m_ModeClkNeg = SPSigEvent(m_Sigclk_mode,AtNeg);
GlobSched.WaitEv(m_ModeClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_ModeClkNeg);
if (ActionInd > 0):
resHdl = (m_Sigmode_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveMode: Error at ind " + str(ActionInd) + " modea = " + hex(modea));
PR(modeb=" + hex(modeb) + " res=" + hex(res) + " resHdl=" + hex(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
modea = SPV.Gen() & 0x7FFFFFFF;
modeb = SPV.Gen() & 0x7FFFFFFF;
res = (modea % modeb) &0xFFFFFFFF;
res = (modea % modeb) &0xFFFFFFFF;
m_Sigmode_a.Set(modea);
m_Sigmode_b.Set(modeb);
PR("DriveMode: numErr=" + str(numErr) + " numOk=" + str(numOk))
#### operator Mult *
def DriveMult(id: int):
PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime()));
m_MultClkNeg = SPSigEvent(m_Sigclk_mult,AtNeg);
GlobSched.WaitEv(m_MultClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_MultClkNeg);
if (ActionInd > 0):
resHdl = (m_Sigmult_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveMult: Error at ind " + str(ActionInd) + " multa = " + str(multa));
PR(" multb=" + str(multb) + " res=" + str(res) + " resHdl=" + str(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
multa = SPV.Gen();
multb = SPV.Gen();
res = (multa * multb) &0xFFFFFFFF;
m_Sigmult_a.Set(multa);
m_Sigmult_b.Set(multb);
PR("DriveMult: numErr=" + str(numErr) + " numOk=" + str(numOk))
#### operator And &
def DriveAnd(id: int):
PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+ str(SPV.SimTime()));
m_AndClkNeg = SPSigEvent(m_Sigclk_and,AtNeg);
GlobSched.WaitEv(m_AndClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_AndClkNeg);
if (ActionInd > 0) :
resHdl = (m_Sigand_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveAnd: Error at ind " + str(ActionInd) + " anda = " + str(anda) + " andb=" + str(andb) + " res=" + str(res) + " resHdl=" + str(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
anda = SPV.Gen();
andb = SPV.Gen();
res = (anda & andb) &0xFFFFFFFF;
m_Sigand_a.Set(anda);
m_Sigand_b.Set(andb);
PR("DriveAnd: numErr=" + str(numErr) + " numOk=" + str(numOk))
def DriveShiftL(id: int):
PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
m_ShiftLClkNeg = SPSigEvent(m_Sigclk_shiftL,AtNeg);
GlobSched.WaitEv(m_ShiftLClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_ShiftLClkNeg);
if (ActionInd) :
resHdl = (m_SigshiftL_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveShiftL: Error at ind " + str(ActionInd) + " shiftLa = " + hex(shiftLa) + " shiftLb=" + hex(shiftLb) + " res=" + hex(res) + " resHdl=" + hex(resHdl));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
shiftLa = SPV.Gen();
shiftLb = (SPV.Gen()%31);
res = (shiftLa << shiftLb) &0xFFFFFFFF;
m_SigshiftL_a.Set(shiftLa);
m_SigshiftL_b.Set(shiftLb);
PR("DriveShiftL: numErr=" + str(numErr) + " numOk=" + str(numOk))
def DriveShiftR(id: int):
PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
m_ShiftRClkNeg = SPSigEvent(m_Sigclk_shiftR,AtNeg);
GlobSched.WaitEv(m_ShiftRClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
GlobSched.WaitEv(m_ShiftRClkNeg);
if (ActionInd) :
resHdl = (m_SigshiftR_res.Val32()) &0xFFFFFFFF;
if (res != resHdl):
PR("DriveShiftR: Error at ind "+str(ActionInd)+" ShiftRa="+hex(ShiftRa)+"ShiftRb=")
PR(hex(ShiftRb)+" res="+hex(res)+" resHdl="+hex(resHdl)+" T:"+ str(SPV.Time()));
numErr = numErr+1;
return;
else:
numOk = numOk+1;
ShiftRa = SPV.Gen();
ShiftRb = (SPV.Gen()%32);
res = (ShiftRa >> ShiftRb) &0xFFFFFFFF;
m_SigshiftR_a.Set(ShiftRa);
m_SigshiftR_b.Set(ShiftRb);
PR("DriveShiftR: numErr=" + str(numErr) + " numOk=" + str(numOk));
#print(datetime.now());
#### Drive crc bits
def DriveCrcData(id: int):
PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
m_CrcClkNeg = SPSigEvent(m_Sigcrc_clock,AtNeg);
GlobSched.WaitEv(m_CrcClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
m_SigData1.Set(SPV.Gen());
m_SigData1.Set(SPV.Gen());
GlobSched.WaitEv(m_CrcClkNeg);
PR("DriveCrcData: finish ");
#### Cruly command {a,b,c} = {d,e,f}
def DriveCurled(id: int):
PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
m_CurledClkNeg = SPSigEvent(m_Sigcurled_clk,AtNeg);
GlobSched.WaitEv(m_CurledClkNeg);
numOk=0;numErr=0;
for ActionInd in range (0,LoopSize):
m_SigregA.Set(SPV.Gen());
m_SigregB.Set(SPV.Gen());
m_SigregC.Set(SPV.Gen());
m_SigregD.Set(SPV.Gen());
GlobSched.WaitEv(m_CurledClkNeg);
PR("DriveCurled: finish ");
print(datetime.now());
Result of python simulation with wave
