This is the mail archive of the libc-help@sourceware.org mailing list for the glibc project.


Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]
Other format: [Raw text]

vectorized strstr et al


Here is my incomplete patch to speed up string functions. I wrote a
generic loop(string/base/loop.h) to quickly enumerate occurences of
simple pattern.

In strstr main idea is to seek pair of characters. We assume that
digrams are rare in text( for example in english th is most common with
probability about 1%).

For two way algorithm an easy modification is to seek first digraph by
sse(as will function strstr_two_way do). 
However calculating critical factorization is quite expensive and almost
always cannot pay itself. 

Theoreticaly it is a buy or rent problem and if we switch when overhead 
equal time to compute factorization our time is always at most twice the
optimal decision.

Still startup time can be improved so we use same idea to match first 128 
characters by algorithm with smaller startup time(strstr).

strcasestr is different beast, If I could precompute a per-locale
256byte table then I can obtain almost same speed as strstr for mostly
ascii text.

I applied same loop also for strlen and strchr, When compiled by gcc
their speed is(modulo prefetching) same as current version. 
Assembler output from gcc is quite suboptimal. It could be improved by 
modifying by hand/compiling by icc.

A loop is now written in way it is written to get better assembly
output.



---
 string/base/chr.c       |   29 ++++++++
 string/base/len.c       |   16 +++++
 string/base/loop.h      |  122 +++++++++++++++++++++++++++++++++
 string/base/str.c       |  172 +++++++++++++++++++++++++++++++++++++++++++++++
 sysdeps/x86_64/sse2.h   |  100 +++++++++++++++++++++++++++
 sysdeps/x86_64/ssse3.h  |    4 +
 sysdeps/x86_64/strstr.c |    3 +
 7 files changed, 446 insertions(+), 0 deletions(-)
 create mode 100644 string/base/chr.c
 create mode 100644 string/base/len.c
 create mode 100644 string/base/loop.h
 create mode 100644 string/base/str.c
 create mode 100644 sysdeps/x86_64/sse2.h
 create mode 100644 sysdeps/x86_64/ssse3.h
 create mode 100644 sysdeps/x86_64/strstr.c

diff --git a/string/base/chr.c b/string/base/chr.c
new file mode 100644
index 0000000..67634c0
--- /dev/null
+++ b/string/base/chr.c
@@ -0,0 +1,29 @@
+const int unroll=4,prefetch=8;
+
+#define TEST_CODE(so,sn) get_mask(TEST_EQ(sn,vc))
+#define LOOP_BODY(p) return p;
+//#define FAST_START
+
+#ifdef AS_MEMCHR
+  uchar* memchr(   const uchar *s, int c , size_t ss)
+  #define DETECT_END s+ss
+  #define LOOP_END(p) return NULL;
+#endif
+#ifdef AS_RAWMEMCHR
+  uchar* rawmemchr(const uchar *s, int c , size_t ss)
+  #define LOOP_END(p) /*cannot happen*/
+#endif
+#ifdef AS_STRCHR
+  uchar* strchr(   const uchar *s, int c )
+  #define DETECT_ZERO_BYTE
+  #define LOOP_END(p) return NULL;
+#endif
+#ifdef AS_STRCHRNUL
+  uchar* strchrnul(const uchar *s, int c )
+  #define DETECT_ZERO_BYTE
+  #define LOOP_END(p) return p;
+#endif
+{
+  tp_vector vc=BROADCAST(c);
+  #include "loop.h"
+}
diff --git a/string/base/len.c b/string/base/len.c
new file mode 100644
index 0000000..fbffc26
--- /dev/null
+++ b/string/base/len.c
@@ -0,0 +1,16 @@
+const int unroll=4,prefetch=8;
+
+#define DETECT_ZERO_BYTE
+#define TEST_CODE(so,sn) 0
+#define LOOP_BODY(p) return p-s;
+#define LOOP_END(p)  /*empty*/
+
+#ifdef NVERSION
+  size_t strnlen( uchar *s , size_t ss )
+  #define DETECT_END s+ss
+#else
+  size_t strlen(  uchar *s )
+#endif
+{
+  #include "loop.h"
+}
diff --git a/string/base/loop.h b/string/base/loop.h
new file mode 100644
index 0000000..b81de50
--- /dev/null
+++ b/string/base/loop.h
@@ -0,0 +1,122 @@
+/* Basic loop - string functions are stream oriented. We use common loop to quickly find patterns in stream.
+
+  
+  Finding patterns is done by using vector instructions.(see base/vector.h) With SSE2 they manipulate 16 bytes at once.
+
+  
+
+  Finding pattern is done by function specific macro TEST_CODE
+  TEST_CODE(so,sn)
+  which gets as input vector sn of current 16 bytes and vector so containing previous 16 bytes.
+  It produces mask with i-th bit set to 1 iff we found pattern ending on i-th byte.
+  
+  Then all occurences of pattern are enumerated we call a macro 
+  LOOP_BODY(p) where p is pointer to end of pattern.
+  In LOOP_BODY macro you can access variables
+  s      start of string
+  umask  which contains mask and you can suppress future LOOP_BODY calls by zeroing corresponding bits.
+  
+  if you need only to skip character interval define macro
+  CAN_SKIP
+  then you can assign to pointer skip_to and patterns ending before this position will be skipped.
+  End condition will be invoked even if position is skipped.
+
+  End condition can be specified by macros
+  DETECT_ZERO_BYTE    that ends when we find zero byte and
+  DETECT_END          that returns pointer where loop ends.
+  When this end condition occurs macro
+  LOOP_END(p) is called.
+
+
+*/
+
+
+
+#ifdef DETECT_ZERO_BYTE
+  #define _DETECT_ZERO_BYTE(u) zmask##u= find_zeros(sn);
+#else
+  #define _DETECT_ZERO_BYTE 
+#endif
+#ifdef DETECT_END
+  #define _DETECT_END(u) if (DETECT_END<=s2+u*BYTES_AT_ONCE) zmask = ((tp_mask)1)<<(DETECT_END-s2);
+#else
+  #define _DETECT_END(u)
+#endif
+
+#define _TEST_CODE(u) \
+  mask=0;\
+  so=sn;\
+  sn=LOAD(s2+u*VSIZ_BYTE);\
+  _DETECT_ZERO_BYTE(u);\
+  mask    = TEST_CODE(so,sn); \
+  mask##u = mask;
+
+  
+#define TEST_BODY4 \
+zmask=0;\
+_DETECT_END(4);\
+_TEST_CODE(0);_TEST_CODE(1);_TEST_CODE(2);_TEST_CODE(3);\
+zmask=(zmask0|zmask1)|(zmask2|zmask3)|zmask;\
+ mask=( mask0| mask1)|( mask2| mask3);
+
+
+#ifndef LOOP_END
+  #define LOOP_END(p)
+#endif
+
+#define _SKIP_BODY if      (s2+PARA <=skip_to) mask=zmask;\
+                   else if (s2      < skip_to) mask=forget_low_bits(mask,skip_to-s2) | zmask;
+
+#ifdef CAN_SKIP
+  #define SKIP_BODY _SKIP_BODY
+#else
+  #define SKIP_BODY 
+#endif
+
+#define LOOP_TEST     \
+     if(__builtin_expect(mask|zmask,0)){\
+       mask =(( mask0<<(0*VSIZ_BYTE))|( mask1<<(1*VSIZ_BYTE)))|(( mask2<<(2*VSIZ_BYTE))|( mask3<<(3*VSIZ_BYTE)));\
+       zmask=((zmask0<<(0*VSIZ_BYTE))|(zmask1<<(1*VSIZ_BYTE)))|((zmask2<<(2*VSIZ_BYTE))|(zmask3<<(3*VSIZ_BYTE)));\
+       mask = mask|zmask;\
+       LOOP_TEST_START\ 
+       SKIP_BODY\
+       enum_bits_start(mask,i)\
+         uchar *p=s2+i;\
+         if(__builtin_expect(get_bit(zmask,i),0)){\
+           LOOP_END(p)\
+         }\
+         LOOP_BODY(p)\
+         SKIP_BODY\
+       enum_bits_end\
+     }
+
+   int i,j;
+   tp_vector vzero=BROADCAST_ZERO();
+   tp_vector sn,so;
+   tp_mask mask,mask0,mask1,mask2,mask3,zmask,zmask0,zmask1,zmask2,zmask3;
+   int s_offset; uchar* s2;
+   sn=vzero;
+#ifdef FAST_START
+   /*TODO faster startup*/
+   abort();
+#else
+  ALIGN(s,unroll);
+  TEST_BODY4;
+  #define LOOP_TEST_START mask=forget_low_bits(mask,s_offset); 
+  LOOP_TEST
+  #undef  LOOP_TEST_START
+  #define LOOP_TEST_START
+  s2+=PARA;
+  while(1){
+     TEST_BODY4;
+     if(prefetch) PREFETCH(s2+prefetch*CACHE_LINE_SIZE);
+     LOOP_TEST
+     s2+=PARA;
+   }
+#endif
+
+#undef DETECT_ZERO_BYTE
+#undef DETECT_END
+#undef _DETECT_ZERO_BYTE
+#undef _DETECT_END
+#undef TEST_BODY
diff --git a/string/base/str.c b/string/base/str.c
new file mode 100644
index 0000000..d041695
--- /dev/null
+++ b/string/base/str.c
@@ -0,0 +1,172 @@
+#define NDEBUG
+#include <assert.h>
+const int unroll=4,prefetch=8,phase1=2,phase2=8;
+const int small_treshold=128;
+static uchar* strstr_medium(uchar *s,uchar *n,int ns,uchar *s_end);
+static uchar* strstr_two_way(uchar *s,int ss, uchar *n,int ns);
+
+#ifdef AS_STRSTR
+  #define STRCHR(s,c) strchr(s,c)
+  #define ND_END(x) !n[x]
+  #define HS_END(x) !s[x]
+  uchar *strstr(const uchar *s,const uchar *n)
+#endif
+#ifdef AS_MEMMEM
+  #define STRCHR(s,c) strchr(s,s_end-s,c)
+  #define ND_END(x) (x==ns)
+  #define HS_END(x) (x==ss)
+  uchar *memmem(const uchar *s,size_t ss,const uchar *n,size_t ns)
+#endif
+{
+  uchar *s_end=NULL;
+  uchar *s2=s;
+  int i,cnt=0;
+  int m=0;
+  if(s2-s>=small_treshold) return strstr_medium(s2,n,strlen(n),s_end);
+  s2=STRCHR(s2+m,n[m]);
+  while(s2){
+    s2=s2-m;
+    for (i=m; !ND_END(i) && n[i]==s2[i]; i++);
+    if (ND_END(i)){
+      cnt++;
+      if(cnt==4) return strstr_medium(s2,n,i,s_end);
+      for(i=0;i<m && n[i]==s2[i]; i++);
+      if(i==m) return s2;
+    }
+    m=i;
+    if(HS_END(m)) return NULL;
+    if(s2-s>=small_treshold) return strstr_medium(s2,n,strlen(n),s_end);
+    s2++;
+    s2=STRCHR(s2+m,n[m]);
+  }
+  return NULL;
+}
+
+static size_t strcmp_dir(const uchar *a,const uchar *b,size_t no,int dir){
+  int i;
+  for(i=0;i<no && *a==*b;i++){a+=dir;b+=dir;}
+  return i;
+}
+
+static uchar* strstr_medium(uchar *s,uchar *n,int ns,uchar *s_end){
+  assert(ns>1);//ns=1 is handled by strstr alone.
+  
+  tp_vector vn0=BROADCAST(n[ns-1-0]); 
+  tp_vector vn1=BROADCAST(n[ns-1-1]);
+  tp_vector e0,e1;
+  int rent=0,buy=8*ns;
+
+  #define TEST_CODE(so,sn) 0;\
+     e0 = XOR(CONCAT(so,sn,BYTES_AT_ONCE-0),vn0); \
+     e1 = XOR(CONCAT(so,sn,BYTES_AT_ONCE-1),vn1); \
+     mask = find_zeros(OR(e0,e1));
+
+  #define LOOP_BODY(p) \
+    int checked=strcmp_dir(p,n+ns-1-2,ns-2,-1); \
+    if (checked==ns-2) return p-ns+1; \
+    rent+=checked;\
+    if( rent>buy+(s2-s)/4) return strstr_two_way(p-ns+1,strlen(p-ns+1),n,ns);
+
+  #define LOOP_END(p) return NULL;
+  #define CAN_SKIP
+  uchar *skip_to=n+ns-1;
+  #include "loop.h"
+}
+
+static inline int max(int x,int y){ return x>y ? x : y; }
+static inline int min(int x,int y){ return x<y ? x : y; }
+
+/* Two way algorithm: CROCHEMORE M., PERRIN D., 1991, Two-way string-matching, Journal of the ACM 38(3):651-675.
+   Implementation based from http://www-igm.univ-mlv.fr/~lecroq/string/node26.html
+*/
+
+static int periodic(uchar *a,uchar *b,int siz){int i;
+  return strcmp_dir(a,b,siz,1)==siz;
+}
+static int maxSuf(uchar *n, int ns, int *p,int ord) {
+   int ms, j, k;
+   uchar a, b;
+
+   ms = -1;
+   j = 0;
+   k = *p = 1;
+   while (j + k < ns) {
+      a = n[  j + k];
+      b = n[ ms + k];
+      if (ord ? a < b : a > b) {
+         j += k;
+         k = 1;
+         *p = j - ms;
+      }
+      else
+         if (a == b)
+            if (k != *p)
+               ++k;
+            else {
+               j += *p;
+               k = 1;
+            }
+         else { /* a > b */
+            ms = j;
+            j = ms + 1;
+            k = *p = 1;
+         }
+   }
+   return(ms);
+}
+static void two_way_preprocessing(uchar *n,int ns,int *per2,int *ell2,int *peri){
+  int u,v,up,vp;
+  int per,ell;
+  uchar *prefix;
+  u=maxSuf(n,ns,&up,0);
+  v=maxSuf(n,ns,&vp,1);
+  ell = (u > v) ? u :  v;
+  per = (u > v) ? up : vp;
+  *peri = periodic(n, n + per, ell + 1);
+  if (!*peri)
+    per = max(ell + 1, ns - ell - 1) + 1;
+  *per2=per;
+  *ell2=ell;
+}
+
+static uchar* strstr_two_way(uchar *s,int ss, uchar *n,int ns){
+  int per,ell,peri;
+  two_way_preprocessing(n,ns,&per,&ell,&peri);
+
+  int chk  = min(ell+2,ns-1);
+  int fwn  = ns-chk;
+  int bwn  = chk+1-2; 
+  int memory = -1;
+  tp_vector vn0=BROADCAST(n[chk-1-0]); 
+  tp_vector vn1=BROADCAST(n[chk-1-1]);
+  tp_vector e0,e1;
+  
+  #define TEST_CODE(so,sn) 0;\
+     e0 = XOR(CONCAT(so,sn,BYTES_AT_ONCE-0),vn0); \
+     e1 = XOR(CONCAT(so,sn,BYTES_AT_ONCE-1),vn1); \
+     mask = find_zeros(OR(e0,e1));
+  
+  #undef LOOP_BODY
+  #define LOOP_BODY(p) \
+      p = p - chk; /*p is now start of string*/ \
+      if (!peri){\
+        int fwcheck   = strcmp_dir(p+ns-fwn, n+ns-fwn, fwn,  1);\
+        if(   fwcheck != fwn ){\
+          skip_to   = p+ns-fwn+ fwcheck;\
+        } else {\
+          int bwcheck = strcmp_dir(p+bwn,    n+bwn,    bwn, -1);\
+          if (bwcheck != bwn){\
+            skip_to = p+ns-fwn+ per;\
+          } else {\
+            return p;\
+          }\
+        }\
+      } else { \
+        abort();\
+      }
+
+  #define LOOP_END(p) return NULL;
+  #define CAN_SKIP
+  uchar *skip_to=n+chk;
+  #include "loop.h"
+}
diff --git a/sysdeps/x86_64/sse2.h b/sysdeps/x86_64/sse2.h
new file mode 100644
index 0000000..6ddcfd6
--- /dev/null
+++ b/sysdeps/x86_64/sse2.h
@@ -0,0 +1,100 @@
+#include <stdint.h>
+#include <emmintrin.h>
+#include <tmmintrin.h>
+#include <stdlib.h>
+
+typedef unsigned char uchar;
+typedef __m128i tp_vector;
+typedef long tp_mask;
+
+#define SI static inline
+#define BYTES_AT_ONCE sizeof(tp_vector)
+#define PARA (BYTES_AT_ONCE*unroll)
+#define VSIZ_BYTE sizeof(tp_vector)
+#define VSIZ_BIT  (VSIZ_BYTE*8)
+#define MSIZ_BYTE sizeof(tp_mask)
+#define MSIZ_BIT  (MSIZ_BYTE*8)
+
+#define CACHE_LINE_SIZE 64
+#define PREFETCH(x)	_mm_prefetch(((char *)x),_MM_HINT_T0);
+#define ALIGN(x,u)         s_offset=((size_t) x)%((u)*BYTES_AT_ONCE);           s2=((uchar *)x)-s_offset;
+
+SI tp_mask get_mask(tp_vector x){  return  _mm_movemask_epi8(x); }
+
+SI tp_mask first_bit(tp_mask t){ return __builtin_ctzl(t);}
+SI tp_mask  last_bit(tp_mask t){ return __builtin_clzl(t);}
+
+SI tp_mask forget_first_bit(tp_mask t){return t&(t-1);}
+#define enum_bits_start(msk,i) while(msk){ i=first_bit(msk); msk=forget_first_bit(msk);
+#define enum_bits_end          }
+
+SI tp_mask get_bit(tp_mask x,int y){return (x&(((tp_mask)1)<<(y)));}
+SI tp_mask shift_down(tp_mask x,int y){ return x>>y;}
+SI tp_mask shift_up  (tp_mask x,int y){ return x<<y;}
+SI tp_mask forget_low_bits(tp_mask m,int b)
+{
+	if (b>=MSIZ_BIT) return 0;
+  return shift_up(shift_down(m,b),b);
+}
+SI tp_mask forget_high_bits(tp_mask m,int b)
+{
+	if (b>=MSIZ_BIT) return 0;
+  b=MSIZ_BIT-b;
+  return shift_down(shift_up(m,b),b);
+}
+
+
+
+
+
+SI tp_vector BYTE_AT(uchar c,int shift)
+{
+  return _mm_set_epi64x(((uint64_t)c)<<(8*shift),((uint64_t)c)<<(8*(shift-8)));
+}
+
+#define TEST_EQ  _mm_cmpeq_epi8
+#define AND  _mm_and_si128
+#define ANDNOT(x,y) _mm_andnot_si128(y,x)
+#define OR   _mm_or_si128
+#define XOR  _mm_xor_si128
+#define SHIFT_DOWN _mm_srli_si128
+#define SHIFT_UP   _mm_slli_si128
+#define CONCAT(x,y,n) (n==0) ? (y) : ((n==BYTES_AT_ONCE) ? (x) : OR(shift_up(x,BYTES_AT_ONCE-(n)),shift_down(y,(n))))
+SI tp_vector BROADCAST(uchar c)
+{
+  return _mm_set1_epi8(c);
+}
+
+SI tp_vector BROADCAST_ZERO(){
+	tp_vector m;
+	m=XOR(m,m);
+  return m;
+}
+#define find_zeros(x) get_mask(TEST_EQ(x,vzero))
+
+
+#define LOAD(x) _mm_load_si128((tp_vector*)(x))
+#define LOAD_UNALIGNED(x) _mm_loadu_si128(x)
+
+
+SI tp_vector TEST_RANGE(tp_vector v,uchar from,uchar to){
+	tp_vector fv=BROADCAST(-127-from);
+	v=_mm_add_epi8(v,fv);
+	tp_vector tv=BROADCAST(-127+to-from+1);
+	return _mm_cmplt_epi8(v,tv);
+}
+
+SI tp_vector PARALEL_TOLOWER(tp_vector m){tp_mask mask;
+  tp_vector high_bit=BROADCAST(128);
+  tp_vector l= AND(TEST_RANGE(m,'A','Z'),high_bit);
+  m=OR(m,_mm_srli_epi64(l,2));
+  if (mask=get_mask(m)){int i;
+    enum_bits_start(mask,i)
+      uchar *c=((uchar*)&m)+i;
+      *c=isupper(*c) ? tolower(*c) : *c;
+    enum_bits_end
+  }
+  return m;
+}
+
+
diff --git a/sysdeps/x86_64/ssse3.h b/sysdeps/x86_64/ssse3.h
new file mode 100644
index 0000000..01b5534
--- /dev/null
+++ b/sysdeps/x86_64/ssse3.h
@@ -0,0 +1,4 @@
+#include "sse2.h"
+#undef CONCAT
+
+#define CONCAT(x,y,n) _mm_alignr_epi8(x,y,n)
diff --git a/sysdeps/x86_64/strstr.c b/sysdeps/x86_64/strstr.c
new file mode 100644
index 0000000..b48303d
--- /dev/null
+++ b/sysdeps/x86_64/strstr.c
@@ -0,0 +1,3 @@
+#include "sse2.h"
+#define AS_STRSTR
+#include "../../string/base/str.c"
-- 
1.7.4.4



Index Nav: [Date Index] [Subject Index] [Author Index] [Thread Index]
Message Nav: [Date Prev] [Date Next] [Thread Prev] [Thread Next]