top of page

SPV Enable:

Python HDL Verification

Python as other language have advantages and disadvantages.

advantages list click here.

New:  Using python as a verification environment.

We add in SPV a package that support using python as a verification environment. A special python package supplied with interface to HDL include the following python classes:

  •      Signal access READ/WRITE.

  •      Create HDL event and enable Wait(HdlEvent),

  •      Scheduler that enable:

    •  create python thread (TCM).​

    • Manage all thread in simulation time.

  • Global method that enable:​

    • Advance time.​

    • pass to simulator prompt and return.

    • Print or use time as a variable.

  • All python capabilities free for any using in the verification environment.​

    Example: 

In this example we drive packet to design, while the Monitor/Checker check correction. There are  4 interfaces each Driver/Monitor instance  take care of 1 interface.

The following function display:

  •     Driver function -- Drive NumPacket that configured by Python random

  •     Monitor function -- Monitor data and check correction, mutual score board fill by driver, while monitor collect data and make compare.

  •     Generator  - Generate interface and NumPacket.

  •     Test - Launch Monitor and Driver for NumPacket driving and checking.

Driver  code

 

#  Driver code drive 'NumFrame=X' packets

# import scheduler and HDL python interface

from  SpvPython  import *  

# import python signal interface to the HDL itself

from  AllSig     import *  

# import random to generate  numPackeet . 

import random              

# import current Generator that generate the HDL interface.

from   Generator import *  

def    DriveProc(id: int):

   newVal = 0;#value to set to m_SigData 

      #There are 4 interface get current HDL interface

   MiiInf = MiiInterface(id);

   # Create event

   clkPos = SPSigEvent(MiiInf.SigClk,AtPos);

   #example how to declare signal access local. 

   SigData= SPSig(TOP_0 + "miiD" + str(id));

   MiiInf.SigEn.Set(0);#Set data to HDL

   StrPrefix = "Driver" + str(id) + ":";

   #Loop that send NumFrame packets

   for FrameIndex in range (0,NumFrame):

        GlobSched.WaitEv(clkPos); #Wait clk                          

   for SliceInd in range(0,99):                      

     if SliceInd is 0:     

         MiiInf.SigEn.Set(1);   

      #Set enable when start new frame                            

      else

         MiiInf.SigErr.Set(0);

    #drive random data using pythom random

      newVal=(int(random.random()*0x7FFFFFFF))%256;

      SigData.Set(newVal);

        #Prepare the expected formonitor                                   MiiInf.listData.append(newVal+6);                 GlobSched.WaitEv(clkPos);

#before new loop set enable '0'     

   MiiInf.SigEn.Set(0);

#wait 1-10 clocks

   numWait=(int(random.random()*0x7FFFFFFF)%10) +4;

   #Wait some clocks before next packet

   for i in range (0,numWait):                         GlobSched.WaitEv(clkPos);

Up

 

Monitor code

Python advantages

#  Monitor code drive 'NumFrame=X' packets

# import scheduler and HDL python interface

from  SpvPython  import *  

# import python signal interface to the HDL itself

from  AllSig     import *  

# import random to generate  numPackeet . 

import random              

# import current Generator that generate the HDL interface.

from   Generator import * 

# import Entry SPV which is the SPV interface to python.

import SPV  

                                       

def  MonitorProc(id: int):                

   getVal     = 0;

   PacketInd  = 0;        

#Get interface [0..3] with the  score board pointer

   MiiInf     = MiiInterface(id);  

# create event on clock posedge

   clkPos     =SPSigEvent(MiiInf.SigClk,AtPos);

   SigData    =SPSig(TOP_0+"miiD"+str(id)); #example how to declare local signal interface             

   DataValidPos=SPSigEvent(MiiInf.SigEn,AtPos)

   StrPrefix   = "Monitor" + str(id) + ":";

               

   while True:

      SliceInd = 0;

      if MiiInf.SigEn.Val64() is 0:

         GlobSched.WaitEv(DataValidPos);       

                       

#wait 1 clock any way

       GlobSched.WaitEv(clkPos);

   while MiiInf.SigEn.Val64() is 1:

      Time  = "T: " + str(SPV.SimTime());

      getVal = SigData.Val64();

      if (len(MiiInf.listData) >0):

         if (MiiInf.listData[0] != getVal):

            print(StrPrefix," Error** in pkt ",PacketInd,"derive:",MiiInf.listData[0]," Got ",getVal,Time);

         else:

            print (StrPrefix, " OK in packet ",PacketInd,"Slice ",SliceInd," is ",getVal,"T: ",Time);     

#throw frame after finish check 

    MiiInf.listData.pop(0);

    GlobSched.WaitEv(clkPos);

    SliceInd = SliceInd + 1;

    PacketInd= PacketInd +1;

    if PacketInd >= NumFrame:

       GlobSched.WaitEv(clkPos);

    return;

Up

Generator code

#  Generator code , generate the interface and score board

 import      random

 from AllSig import *    

 # score board for interface 0..3

 listOfData0  = [];      

 listOfData1  = [];               

 listOfData2  = [];               

 listOfData3  = [];               

 NumFrame     = 3; # default 

 def   GenNumFrame(max: int):

    NumFrame=(int(random.random()*0x7FFFFFFF)% max)+1;

 class MiiInterface():

    def __init__(self,miiInd: int):

       self.strId=str(miiInd);

       #initial clk[0..3] by dynamic string

                      self.SigClk=SPSig(TOP_0+"mii_clk"+miiInd);

          # enable[0..3] 

       self.SigEn=SPSig(TOP_0+"mii_en"+miiInd);

       #err[0..3]

      self.SigErr=SPSig(TOP_0+"mii_err"+miiInd); 

      exec("self.listData=listOfData%d"%miiInd));

# create string on line

Up

Test Code

# Start Driver and monitor code as a thread, they start immediately.

import Driver

import Monitor

def Test():          

#create driver + monitor interface 0

   GlobSched.AddThread(Driver.DriveProc,'Dr0',0)

   GlobSched.AddThread(Monitor.MonitorProc,'Mon0',0)

# Run SpvPython scheduler, start all the TCM as a HDL thread

   GlobSched.Run();

Up

HDL Interface

#This list of signal created automatically by SPV HDL  explorer.

#when such line declare, it immediately connected to design

from SpvPython import *;

 

#Path list

TOP_0 = "topTb."  # the full path string

 

 m_Sigclk_in         = SPSig( TOP_0 + "clk_in");

 m_Sigmain_rst    = SPSig( TOP_0 + "main_rst");

 m_Sigclk0           = SPSig( TOP_0 + "clk0");

 m_SigdataIn0     = SPSig( TOP_0 + "dataIn0");

 m_SigdataOut0  = SPSig( TOP_0 + "dataOut0");

 m_Sigclk1            = SPSig( TOP_0 + "clk1");

 m_SigdataIn1      = SPSig( TOP_0 + "dataIn1");

 m_SigdataOut1   = SPSig( TOP_0 + "dataOut1");

 m_Sigclk2           = SPSig( TOP_0 + "clk2");

 m_SigdataIn2     = SPSig( TOP_0 + "dataIn2");

 m_SigdataOut2  = SPSig( TOP_0 + "dataOut2");

 m_Sigmii_clk0    = SPSig( TOP_0 + "mii_clk0");

 m_SigmiiD0        = SPSig( TOP_0 + "miiD0");

 m_Sigmii_en0    = SPSig( TOP_0 + "mii_en0");

 m_Sigmii_err0    = SPSig( TOP_0 + "mii_err0");

 m_Sigmii_clkPy = SPSig( TOP_0 + "mii_clkPy");

 m_SigmiiDPy    = SPSig( TOP_0 + "miiDPy");

 m_Sigmii_enPy = SPSig( TOP_0 + "mii_enPy");

 m_Sigmii_errPy = SPSig( TOP_0 + "mii_errPy");

 m_Sigmii_clk1    = SPSig( TOP_0 + "mii_clk1");

Up

Python advantages

Other Scheduler interface supply with product

Python testbench - design from Simulator page (X5)

file AllSig.py 

#list of signal indicated in function SpvConfig::CreateSpvHdlInf
#created automatically and could be extend by user.
from SpvPython import *;

#Path list
TOP_0 = "TopCalc."


m_SigData1           = SPSig( TOP_0 + "Data1");
m_SigData2           = SPSig( TOP_0 + "Data2");
m_SigData2Slice      = SPSig( TOP_0 + "Data2Slice");
m_Sigadd_a           = SPSig( TOP_0 + "add_a");
m_Sigadd_b           = SPSig( TOP_0 + "add_b");
m_Sigadd_res         = SPSig( TOP_0 + "add_res");
m_Sigand_a           = SPSig( TOP_0 + "and_a");
m_Sigand_b           = SPSig( TOP_0 + "and_b");
m_Sigand_res         = SPSig( TOP_0 + "and_res");
m_Sigassign_clk      = SPSig( TOP_0 + "assign_clk");
m_Sigcase_clk        = SPSig( TOP_0 + "case_clk");
m_Sigcase_data       = SPSig( TOP_0 + "case_data");
m_Sigcase_value      = SPSig( TOP_0 + "case_value");
m_Sigclk_add         = SPSig( TOP_0 + "clk_add");
m_Sigclk_and         = SPSig( TOP_0 + "clk_and");
m_Sigclk_div         = SPSig( TOP_0 + "clk_div");
m_Sigclk_factorial   = SPSig( TOP_0 + "clk_factorial");
m_Sigclk_mode        = SPSig( TOP_0 + "clk_mode");
m_Sigclk_mult        = SPSig( TOP_0 + "clk_mult");
m_Sigclk_or          = SPSig( TOP_0 + "clk_or");
m_Sigclk_shiftL      = SPSig( TOP_0 + "clk_shiftL");
m_Sigclk_shiftR      = SPSig( TOP_0 + "clk_shiftR");
m_Sigclk_sub         = SPSig( TOP_0 + "clk_sub");
m_Sigclk_xor         = SPSig( TOP_0 + "clk_xor");
m_Sigcrc_clock       = SPSig( TOP_0 + "crc_clock");
m_Sigcurled_clk      = SPSig( TOP_0 + "curled_clk");
m_Sigdiv_a           = SPSig( TOP_0 + "div_a");
m_Sigdiv_b           = SPSig( TOP_0 + "div_b");
m_Sigdiv_res         = SPSig( TOP_0 + "div_res");
m_Sigfifo_count      = SPSig( TOP_0 + "fifo_count");
m_Sigfifo_empty      = SPSig( TOP_0 + "fifo_empty");
m_Sigfifo_full       = SPSig( TOP_0 + "fifo_full");
m_Sigfifo_size       = SPSig( TOP_0 + "fifo_size");
m_Sighave_2_write    = SPSig( TOP_0 + "have_2_write");
m_Sigmode_a          = SPSig( TOP_0 + "mode_a");
m_Sigmode_b          = SPSig( TOP_0 + "mode_b");
m_Sigmode_res        = SPSig( TOP_0 + "mode_res");
m_Sigmult_a          = SPSig( TOP_0 + "mult_a");
m_Sigmult_b          = SPSig( TOP_0 + "mult_b");
m_Sigmult_res        = SPSig( TOP_0 + "mult_res");
m_Sigor_a            = SPSig( TOP_0 + "or_a");
m_Sigor_b            = SPSig( TOP_0 + "or_b");
m_Sigor_res          = SPSig( TOP_0 + "or_res");
m_SigregA            = SPSig( TOP_0 + "regA");
m_SigregB            = SPSig( TOP_0 + "regB");
m_SigregC            = SPSig( TOP_0 + "regC");
m_SigregD            = SPSig( TOP_0 + "regD");
m_SigshiftL_a        = SPSig( TOP_0 + "shiftL_a");
m_SigshiftL_b        = SPSig( TOP_0 + "shiftL_b");
m_SigshiftL_res      = SPSig( TOP_0 + "shiftL_res");
m_SigshiftR_a        = SPSig( TOP_0 + "shiftR_a");
m_SigshiftR_b        = SPSig( TOP_0 + "shiftR_b");
m_SigshiftR_res      = SPSig( TOP_0 + "shiftR_res");
m_Sigsub_a           = SPSig( TOP_0 + "sub_a");
m_Sigsub_b           = SPSig( TOP_0 + "sub_b");
m_Sigsub_res         = SPSig( TOP_0 + "sub_res");
m_Siguse_SpvTb       = SPSig( TOP_0 + "use_SpvTb");
m_SigwireA           = SPSig( TOP_0 + "wireA");
m_SigwireB           = SPSig( TOP_0 + "wireB");
m_SigwireC           = SPSig( TOP_0 + "wireC");
m_Sigwire_2_read     = SPSig( TOP_0 + "wire_2_read");
m_Sigwire_2_write    = SPSig( TOP_0 + "wire_2_write");
m_SigxorData1        = SPSig( TOP_0 + "xorData1");
m_Sigxor_a           = SPSig( TOP_0 + "xor_a");
m_Sigxor_b           = SPSig( TOP_0 + "xor_b");
m_Sigxor_res         = SPSig( TOP_0 + "xor_res");
 

file SpvPython.py 

This file is the python interface to SPV, it enable schedule simulator event, doing Wait([event,time]), declare signal, generator and more. this is the main file that enable using python as a verification environment.

from ctypes import *
from enum import Enum
import sys
import threading
import time
import datetime
import SPV 

 

class SPSig():
   def __init__(self, SigFullPath):
       self.m_Name = SigFullPath;
       self.SigId  = SPV.InitSig(SigFullPath);

    def  Set(self,value):
       SPV.Set2Sig(int(value),self.SigId);


    def    Val64(self):
       val = SPV.SigVal64(self.SigId);
       return val;

    def    Val32(self):
       val = SPV.SigVal32(self.SigId);
       return val;

    def    Name(self):        
       return  self.m_Name;

 

#need to implement
class    SpvGen():
   def __init__(self, GenGuiName):
       self.m_Name = GenGuiName;
       self.GenId    = SPV.GetGuiGen(GenGuiName);

    def  Gen(self):
       val = SPV.Gen(self.GenId);
       return genVal;

class SPEvType(Enum):
   AtPos = 0
   AtNeg = 1
   AtChange = 2
   AtEqual = 3    

    def  Uint(self):
       return self.value

    def  __str__(self):
       return 'Event'.format(self.value)

#Short key for any using.
AtPos         = SPEvType.AtPos
AtNeg         = SPEvType.AtNeg
AtChange    = SPEvType.AtChange
AtEqual        = SPEvType.AtEqual
       

class  SPSigEvent():
   def __init__(self, sig : SPSig,ev : SPEvType):
       self.m_Sig        = sig;
       self.m_EvType    = 1;#ev;  SHMUEL need to fix why self.m_EvType = ev is not good.
       self.m_IsTime    = 0;
       self.m_SpvEvId    = SPV.InitSigEv(self.m_Sig.SigId,self.m_EvType);#SpvEvId comming from C code (of SPV).
   


#work around of problem with print (Bacause SPV stole any print to ActiveOut class), use PR that print output to stderr
#which work well
pyLog = open('Python.log', 'w');
def   PR(s):
   print(str(s),file=sys.stderr);
   pyLog.write(str(s) + "\n");    
   pyLog.flush();
   return
   
###############        Python-Spv    Scheduler
#these are the field that user should supply for any thread that used as SPV TCM 
#Wait(Event) and any other timming ability that comming from SPV.
class    ThreadParam():
       def __init__(self,DefFunction,name: str,id: int):
           self.ThreadFunction =  DefFunction;
           self.m_ThreadName      =  name;
           self.ThreadId        =  id;

#this class pass as is to SPV for any wait command. SPV create AtomicProcess CB  StartAProc for always.
#when the event occure, SPV mark     m_IsEvOccure = 1. the Scheduler function SPSchedulerCenter::Run
#run the event that m_IsEvOccure = 1, if there is again Wait we SPV check if the CB is for the same event.
#if the answer is True, it is just wait to next event occuration, else it kill the prev event and make new one.
#class OneCEvent(Structure):
#    _fields_ = [
#        ("m_SigInd",    c_uint),#the pointer of the signal that the event (Pos,Neg<change) wait for it.       
#        ("m_EventType", c_uint),#Pos,Neg,change
#        ("m_IsTime",    c_uint),#if is time it is Wait (time)
#        ("m_Delay",        c_uint),#the time in case m_IsTime = True
#        ("m_IsEvOccure",c_uint),#fill by SPV when the event occure.
#        ("m_IsAsPrev",    c_uint),#Python fill this, if it is as prev we don't create CB, just use the prev CB.
#        ("m_LastAProc", c_ulong)#the pointer of the last process control, if it is new Event we kill the old one, and create new one.
#        ]

#when Python (SPV) create thread it create one instance of     OneCEventInfo
#it old one CEvent 
class    OneCEventInfo():    
   def __init__(self,SpvTimeEvId: int,SpvEvId: int,ThreadBornIn : int,name: str):
       #self.m_CEvent     = CEvent;
       self.m_TSpvEvId = SPV.InitSigEv(999,999);#Create time C event -- 999 will mark that it is time event 
       self.m_SpvEvId  = SpvEvId;
       self.m_BornInd     = ThreadBornIn;#the SPV index of thread the first is 0.
       self.m_ThreadEv    = threading.Event();#Event object to use for call wait clear set .....
       self.m_Name        = name;        
   
class    SPSchedulerCenter():
   def __init__(self):
       self.numThread = 0;#this line and all lines with this variable should be deleted, don't need them.
#self.Events = [];
       #self.OneCEventList     = [];#all  event that are  in live (Wait event  to occure, or occure now)
       #self.EvMng            = EventSHolder();#Manager to all  event 
       self.ThreadParamList= [];#list of new thread to init.
       self.MainEvent        = threading.Event();
       self.IsDebug        = True;
       self.OneCEventInfoLst=[];
       self.CurrCEvInfo    = None;
       #self.TimeCEvInfo    = None;
       self.ThreadCounter  = 0;
       self.m_ToStop        = False;
       self.IsThreadKilled = False; 

    
       
   def  WaitEv(self,SPSigEvent):        
       prevSpvId = self.CurrCEvInfo.m_SpvEvId;
       self.CurrCEvInfo.m_SpvEvId = SPSigEvent.m_SpvEvId;#update any way
       SPV.AddPyCb(prevSpvId,SPSigEvent.m_SpvEvId);
                       
       if self.MainEvent.is_set() is False:
           self.MainEvent.set();#release main thread
       self.CurrCEvInfo.m_ThreadEv.clear();#because if scheduler release it, it is true I must clear it to be false.
       self.CurrCEvInfo.m_ThreadEv.wait();
       #if self.IsDebug is True:
           #PR(threading.current_thread().name + " Wake up at time " + str(SPV.SimTime()));        
       return;


    def    WaitTime(self,timeDelay : int):        
       #PR("WaitTime : " + str(timeDelay));
       #SetTime2EvCb(timeDelay,self.TimeCEvInfo.m_SpvEvId);
       prevSpvId = self.CurrCEvInfo.m_SpvEvId;
       self.CurrCEvInfo.m_SpvEvId = self.CurrCEvInfo.m_TSpvEvId;#update any way
       SPV.AddTimePyCb(prevSpvId,self.CurrCEvInfo.m_SpvEvId,timeDelay);

        if self.MainEvent.is_set() is False:
           self.MainEvent.set();#release main thread
       self.CurrCEvInfo.m_ThreadEv.clear();#because if scheduler release it, it is true I must clear it to be false.
       self.CurrCEvInfo.m_ThreadEv.wait();
       #if self.IsDebug is True:
           #PR(threading.current_thread().name + " Wake up at time " + str(SPV.SimTime()));        
       return;
   
   def    SetDebug(self,isDebug : bool):
       self.IsDebug = isDebug;

    def    DoStop(self):
       self.m_ToStop = True;    
   #SHMUEL: I want to start thread,  that will call wait  how much time it need.
   #when it finish, I must know about because I want to release the main thread. the solution
   #is to start any thread with this function, which will call  immediatelly to the user function.
   #when user function finish it will return to this function, which release the main thread.
   def ThreadStartPoint(self,threadParam : ThreadParam):
       if self.IsDebug is True:
           PR(threadParam.m_ThreadName + " StartUp with id " + str(threadParam.ThreadId));
       #create thread info which will be alive all the thread live.
       self.CurrCEvInfo = OneCEventInfo(888888888,999999999,self.ThreadCounter,threadParam.m_ThreadName);        
       self.OneCEventInfoLst.append(self.CurrCEvInfo);
       currThreadInd        = self.ThreadCounter;
       self.ThreadCounter    = self.ThreadCounter + 1;
       threadParam.ThreadFunction(threadParam.ThreadId);
       #when it come to this line, this is the time that thread dead.
       #so I  want  to del  the OneCEventInfo from list.
       listLen   = len(self.OneCEventInfoLst);
       
       indexToPop=0;
       for i in range(0,listLen):
           if self.OneCEventInfoLst[i].m_BornInd  is currThreadInd:
               indexToPop = i;
               break;

        if self.IsDebug is True:
           PR("Kill thread " + str(indexToPop) + "  " + self.CurrCEvInfo.m_Name + " at time " + str(SPV.SimTime()));
                       
       SPV.PyLastCbKill(self.OneCEventInfoLst[indexToPop].m_SpvEvId);
       self.OneCEventInfoLst.pop(indexToPop);    
       self.IsThreadKilled = True;
       
       self.MainEvent.set();

    def Run(self):
#run first all the new thread.    
       self.m_ToStop = False;#to enable continue after error or any user stop
       while True:
           #if self.IsDebug is True:
           #    PR("Len of self.ThreadParamList == " + str(len(self.ThreadParamList)));
           for ThP in  self.ThreadParamList:
               #theThread = threading.Thread(name=newThread.m_ThreadName, target=newThread.ThreadFunction);
               theThread = threading.Thread(name=ThP.m_ThreadName, target=self.ThreadStartPoint,args=(ThP,));
               theThread.start();
               self.MainEvent.wait();
               self.MainEvent.clear();#for next time set it false.
           self.ThreadParamList.clear();
           #loop on CB from SPV to run now.
           if (len(self.OneCEventInfoLst) == 0):
               return;

            listCEVLen = len(self.OneCEventInfoLst);
           currCEVInd = 0;
           #for currCEV in range(0,listCEVLen):
           #for ThCEvInfo in self.OneCEventInfoLst:
           advance = 1;
           while currCEVInd < listCEVLen:
               #if self.IsDebug is True:
               #    PR(" listCEVLen == " + str(listCEVLen) +  " at time " + str(SPV.SimTime()));    
               #if (self.OneCEventInfoLst[currCEVInd].m_CEvent.m_IsEvOccure == 1):
               if (SPV.IsEvOccure(self.OneCEventInfoLst[currCEVInd].m_SpvEvId)):
                   self.CurrCEvInfo = self.OneCEventInfoLst[currCEVInd];#ThCEvInfo;
                   #if self.IsDebug is True:
                   #    PR(" m_IsEvOccure == True for thread " + self.CurrCEvInfo.m_Name +  " at time " + str(SPV.SimTime()));
                   self.OneCEventInfoLst[currCEVInd].m_ThreadEv.set();
                   #self.CurrCEvInfo.m_ThreadEv.set();#ThCEvInfo.m_ThreadEv.set();#release running thread.
                   self.MainEvent.wait();
                   self.MainEvent.clear();#for next time set it false.
                   if self.IsThreadKilled is  True:
                       listCEVLen = listCEVLen -1;                        
                       self.IsThreadKilled  = False;
                       advance = 0;#because one CEvent deleted from list, so i not advance the counter because the next become the current                        
                       PR("  currCEVInd " +  str(currCEVInd));    
               currCEVInd = currCEVInd + advance;
               advance    = 1;
           #if all thread finish, return to prompt.            
           if (len(self.OneCEventInfoLst) == 0):
               return;
           if  self.m_ToStop is True:
               return;
           SPV.PSRun();        

    def    AddThread(self,functionPointer,name: str,id : int):
       self.ThreadParamList.append(ThreadParam(functionPointer,name,id));

        
GlobSched = SPSchedulerCenter();#global scheduler.

       

file Test0.py 

example of test, this test import all what need to start running test. it's start all the thread (TCM - enabele Wait([event], [time]). 

from SpvPython import *
from AllSig import *
import SPV
from SPV import *
import time
import datetime
import DriveOperand
#from datetime import datetime
import threading

def Test():    
   #print time with millisecond
   #PR(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]);
   PR("Start Test");
   datetime.datetime.now(datetime.timezone.utc);
   #GlobSched.SetDebug(True);
   #the Test itself.
   SPV.Wait(1);
   m_Siguse_SpvTb.Set(1);
   GlobSched.AddThread(DriveOperand.DriveAdd,'DriveAdd',0);    
   GlobSched.AddThread(DriveOperand.DriveSub,'DriveSub',0);
   GlobSched.AddThread(DriveOperand.DriveOr,'DriveOr',0);    
   GlobSched.AddThread(DriveOperand.DriveXor,'DriveXor',0);
   GlobSched.AddThread(DriveOperand.DriveMult,'DriveMult',0);
   GlobSched.AddThread(DriveOperand.DriveMode,'DriveMode',0);
   GlobSched.AddThread(DriveOperand.DriveAnd,'DriveAnd',0);
   GlobSched.AddThread(DriveOperand.DriveShiftL,'DriveShiftL',0);
   GlobSched.AddThread(DriveOperand.DriveShiftR,'DriveShiftR',0);
   GlobSched.AddThread(DriveOperand.DriveCrcData,'DriveCrcData',0);
   GlobSched.AddThread(DriveOperand.DriveCurled,'DriveCurled',0);

    GlobSched.Run();    
   #print time with millisecond
   #PR(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]);
   datetime.datetime.now(datetime.timezone.utc);


 

file DriveOperand.py 

This file includes all TCM (Thread) on per operand in verilog. it's drive and check design each clock. it report error and summary in the end.

#import SPSched
from SpvPython     import *
from AllSig     import *
import             random 
from random import randrange #, uniform
from datetime import datetime
from SPV import *


LoopSize = 40001;

#operator Add + 

def    DriveAdd(id: int):    
   PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime());  
   m_AddClkPos = SPSigEvent(m_Sigclk_add,AtPos);
   GlobSched.WaitEv(m_AddClkPos);
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       #GlobSched.WaitEv(m_AddClkNeg);
       GlobSched.WaitTime(40);
       if (ActionInd > 0) :
           resHdl    = (m_Sigadd_res.Val32())&0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveAdd: Err at ind "+str(ActionInd)+" adda = "+str(adda)+" addb=");                            PR(str(addb) + " res=" + str(res) + " resHdl=" + str(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;        
       adda = SPV.Gen()%0x6FFFFFFF;
       addb = SPV.Gen()%0x6FFFFFFF;
       res  = (adda + addb)&0xFFFFFFFF;#Expected
       m_Sigadd_a.Set(adda);
       m_Sigadd_b.Set(addb);        
   PR("DriveAdd:    numErr=" + str(numErr) + " numOk=" + str(numOk));

####   operator SUB - 

def    DriveSub(id: int):    
   PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+ str(SPV.SimTime()));
   m_SubClkPos = SPSigEvent(m_Sigclk_sub,AtPos);
   GlobSched.WaitEv(m_SubClkPos);    
   numOk=0;numErr=0;        
   for ActionInd in range (0,LoopSize):
       #GlobSched.WaitEv(m_SubClkNeg);
       GlobSched.WaitTime(50);
       #PR("DriveSub: After wait Time: " + str(SPV.SimTime()));
       if (ActionInd > 0) :
           resHdl    = (m_Sigsub_res.Val32())&0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveSub: Error at ind "+str(ActionInd)+" suba = "+str(suba)+" subb=");

                PR(+ str(subb) + " res=" + str(res) + " resHdl=" + str(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;            
       suba=    SPV.Gen()%0x6FFFFFFF;
       subb=    SPV.Gen()%0x6FFFFFFF;
       res    =    (suba - subb)            &0xFFFFFFFF;
       m_Sigsub_a.Set(suba);
       m_Sigsub_b.Set(subb);
   PR("DriveSub:    numErr=" + str(numErr) + " numOk=" + str(numOk));  

 

     

#### operator OR | 

def    DriveOr(id: int):    
   PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime()));
   m_OrClkNeg = SPSigEvent(m_Sigclk_or,AtNeg);    
   GlobSched.WaitEv(m_OrClkNeg);    
   numOk=0;numErr=0;        
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_OrClkNeg);
       if (ActionInd > 0) :        
           resHdl    = (m_Sigor_res.Val32()) &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveOr:    Error at ind " + str(ActionInd) + " ora = " + str(ora) + " orb=")                    PR(str(orb) + " res=" + str(res) + " resHdl=" + str(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;
       #Next iteration
       ora = SPV.Gen();
       orb = SPV.Gen();
       res        = (ora | orb)            &0xFFFFFFFF;
       m_Sigor_a.Set(ora);
       m_Sigor_b.Set(orb);
   PR("DriveOr:    numErr=" + str(numErr) + " numOk=" + str(numOk));    

#### Operator xor ^

def    DriveXor(id: int):    
   PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime()));
   m_XorClkNeg = SPSigEvent(m_Sigclk_xor,AtNeg);
   GlobSched.WaitEv(m_XorClkNeg);    
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_XorClkNeg);        
       if (ActionInd > 0):
           resHdl    = (m_Sigxor_res.Val32()) &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveXor:    Error at ind " + str(ActionInd) + " xora = " + str(xora) + " xorb=" + str(xorb) + " res=" + str(res) + " resHdl=" + str(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;
       xora    = SPV.Gen();
       xorb    = SPV.Gen();
       res        = (xora ^ xorb)            &0xFFFFFFFF;
       m_Sigxor_a.Set(xora);
       m_Sigxor_b.Set(xorb);
   PR("DriveXor:    numErr=" + str(numErr) + " numOk=" + str(numOk));    

#### operator Mode %

 

def    DriveMode(id: int):    
   PR(threading.current_thread().name+" thread id = "+str(id)+" start at T: "+str(SPV.SimTime()));
   m_ModeClkNeg = SPSigEvent(m_Sigclk_mode,AtNeg);    
   GlobSched.WaitEv(m_ModeClkNeg);    
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_ModeClkNeg);        
       if (ActionInd > 0):
           resHdl    = (m_Sigmode_res.Val32()) &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveMode:    Error at ind " + str(ActionInd) + " modea = " + hex(modea));

                PR(modeb=" + hex(modeb) + " res=" + hex(res) + " resHdl=" + hex(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;
       modea    = SPV.Gen() & 0x7FFFFFFF;
       modeb    = SPV.Gen() & 0x7FFFFFFF;
       res        = (modea % modeb)        &0xFFFFFFFF;
       res        = (modea % modeb)        &0xFFFFFFFF;
       m_Sigmode_a.Set(modea);
       m_Sigmode_b.Set(modeb);
   PR("DriveMode:    numErr=" + str(numErr) + " numOk=" + str(numOk))

 

#### operator Mult *

def    DriveMult(id: int):    
   PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+str(SPV.SimTime()));
   m_MultClkNeg = SPSigEvent(m_Sigclk_mult,AtNeg);    
   GlobSched.WaitEv(m_MultClkNeg);
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_MultClkNeg);                
       if (ActionInd > 0):
           resHdl    = (m_Sigmult_res.Val32())    &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveMult:    Error at ind " + str(ActionInd) + " multa = " + str(multa));

                PR(" multb=" + str(multb) + " res=" + str(res) + " resHdl=" + str(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;

        multa = SPV.Gen();
       multb = SPV.Gen();
       res        = (multa * multb)            &0xFFFFFFFF;
       m_Sigmult_a.Set(multa);
       m_Sigmult_b.Set(multb);
   PR("DriveMult:    numErr=" + str(numErr) + " numOk=" + str(numOk))

 

#### operator And &
def    DriveAnd(id: int):        
   PR(threading.current_thread().name+" thread id="+str(id)+" start at Time: "+ str(SPV.SimTime()));
   m_AndClkNeg = SPSigEvent(m_Sigclk_and,AtNeg);    
   GlobSched.WaitEv(m_AndClkNeg);
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_AndClkNeg);        
       if (ActionInd > 0) : 
           resHdl    = (m_Sigand_res.Val32())    &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveAnd:    Error at ind " + str(ActionInd) + " anda = " + str(anda) + " andb=" + str(andb) + " res=" + str(res) + " resHdl=" + str(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;

        anda    = SPV.Gen();
       andb    = SPV.Gen();
       res        = (anda & andb)    &0xFFFFFFFF;
       m_Sigand_a.Set(anda);
       m_Sigand_b.Set(andb);
   PR("DriveAnd:    numErr=" + str(numErr) + " numOk=" + str(numOk))

def    DriveShiftL(id: int):    
   PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
   m_ShiftLClkNeg = SPSigEvent(m_Sigclk_shiftL,AtNeg);    
   GlobSched.WaitEv(m_ShiftLClkNeg);
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_ShiftLClkNeg);        
       if (ActionInd) :
           resHdl    = (m_SigshiftL_res.Val32())    &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveShiftL:    Error at ind " + str(ActionInd) + " shiftLa = " + hex(shiftLa) + " shiftLb=" + hex(shiftLb) + " res=" + hex(res) + " resHdl=" + hex(resHdl));
               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;

        shiftLa = SPV.Gen();
       shiftLb = (SPV.Gen()%31);
       res        = (shiftLa << shiftLb)        &0xFFFFFFFF;
       m_SigshiftL_a.Set(shiftLa);
       m_SigshiftL_b.Set(shiftLb);
   PR("DriveShiftL:    numErr=" + str(numErr) + " numOk=" + str(numOk))

def    DriveShiftR(id: int):    
   PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
   m_ShiftRClkNeg = SPSigEvent(m_Sigclk_shiftR,AtNeg);    
   GlobSched.WaitEv(m_ShiftRClkNeg);
   numOk=0;numErr=0;    
   for ActionInd in range (0,LoopSize):
       GlobSched.WaitEv(m_ShiftRClkNeg);        
       if (ActionInd) :
           resHdl    = (m_SigshiftR_res.Val32())    &0xFFFFFFFF;
           if (res != resHdl):
               PR("DriveShiftR:    Error at ind "+str(ActionInd)+" ShiftRa="+hex(ShiftRa)+"ShiftRb=")

               PR(hex(ShiftRb)+" res="+hex(res)+" resHdl="+hex(resHdl)+" T:"+ str(SPV.Time()));

               numErr = numErr+1;
               return;
           else:
               numOk = numOk+1;

        ShiftRa = SPV.Gen();
       ShiftRb = (SPV.Gen()%32);
       res        = (ShiftRa >> ShiftRb)        &0xFFFFFFFF;
       m_SigshiftR_a.Set(ShiftRa);
       m_SigshiftR_b.Set(ShiftRb);
   PR("DriveShiftR:    numErr=" + str(numErr) + " numOk=" + str(numOk));
   #print(datetime.now());    

#### Drive crc bits 
def    DriveCrcData(id: int):    
   PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
   m_CrcClkNeg = SPSigEvent(m_Sigcrc_clock,AtNeg);    
   GlobSched.WaitEv(m_CrcClkNeg);
   numOk=0;numErr=0;
   for ActionInd in range (0,LoopSize):
       m_SigData1.Set(SPV.Gen());
       m_SigData1.Set(SPV.Gen());        
       GlobSched.WaitEv(m_CrcClkNeg);

    PR("DriveCrcData:    finish ");    

#### Cruly command {a,b,c} = {d,e,f} 

def    DriveCurled(id: int):    
   PR(threading.current_thread().name + " thread id = " + str(id) + " start at Time: " + str(SPV.SimTime()));
   m_CurledClkNeg = SPSigEvent(m_Sigcurled_clk,AtNeg);    
   GlobSched.WaitEv(m_CurledClkNeg);
   numOk=0;numErr=0;
   for ActionInd in range (0,LoopSize):
       m_SigregA.Set(SPV.Gen());
       m_SigregB.Set(SPV.Gen());
       m_SigregC.Set(SPV.Gen());
       m_SigregD.Set(SPV.Gen());
       GlobSched.WaitEv(m_CurledClkNeg);

    PR("DriveCurled:    finish ");
   print(datetime.now());    

 

Result of python simulation with wave

WaveFormBig.png
bottom of page