spacepaste

  1.  
  2. #! /usr/bin/env python
  3. from twisted.trial.unittest import TestCase
  4. import os.path as osp
  5. from pickle import dumps
  6. from shutil import rmtree
  7. from os import stat, mkdir
  8. from tempfile import mkdtemp
  9. from twisted.internet import defer
  10. from twisted.enterprise import adbapi
  11. from zope.interface.verify import verifyClass
  12. from pydio.engine import sqlite
  13. class TestStateManagement(TestCase):
  14. """Test state management"""
  15. def setUp(self):
  16. self.meta = mkdtemp()
  17. self.ws = mkdtemp()
  18. self.db = adbapi.ConnectionPool(
  19. "sqlite3", osp.join(self.meta, "db.sqlite"), check_same_thread=False,
  20. )
  21. self.stateman = sqlite.StateManager(self.db)
  22. with open(sqlite.SQL_INIT_FILE) as f:
  23. script = f.read()
  24. self.d = self.db.runInteraction(lambda c, s: c.executescript(s), script)
  25. def tearDown(self):
  26. self.db.close()
  27. del self.db
  28. del self.stateman
  29. del self.d
  30. rmtree(self.meta)
  31. rmtree(self.ws)
  32. @defer.inlineCallbacks
  33. def test_db_clean(self):
  34. """Canary test to ensure that the db is initialized in a blank state"""
  35. yield self.d # wait for the db to be initialized
  36. q = "SELECT name FROM sqlite_master WHERE type='table' AND name=?;"
  37. for table in ("ajxp_index", "ajxp_changes"):
  38. res = yield self.db.runQuery(q, (table,))
  39. self.assertTrue(
  40. len(res) == 1,
  41. "table {0} does not exist".format(table)
  42. )
  43. @defer.inlineCallbacks
  44. def test_inode_create_file(self):
  45. yield self.d
  46. path = osp.join(self.ws, "test.txt")
  47. with open(path, "wt") as f:
  48. pass # touch file
  49. inode = mk_dummy_inode(path)
  50. yield self.stateman.create(inode, directory=False)
  51. entry = yield self.db.runQuery("SELECT * FROM ajxp_index")
  52. emsg = "got {0} results, expected 1. Are canary tests failing?"
  53. lentry = len(entry)
  54. self.assertTrue(lentry == 1, emsg.format(lentry))
  55.