Skip to content

Commit eda79ab

Browse files
rxinharveyfeng
authored andcommitted
Merge pull request #253 from harveyfeng/tablescan
Correctly detect in-memory RDDs for table scans.
1 parent 5c271bb commit eda79ab

File tree

2 files changed

+14
-9
lines changed

2 files changed

+14
-9
lines changed

src/main/scala/shark/execution/TableScanOperator.scala

+10-9
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@ import org.apache.spark.rdd.{PartitionPruningRDD, RDD}
3737

3838
import shark.{LogHelper, SharkConfVars, SharkEnv}
3939
import shark.execution.optimization.ColumnPruner
40-
import shark.memstore2.{CacheType, ColumnarSerDe, MemoryMetadataManager}
40+
import shark.memstore2.CacheType
41+
import shark.memstore2.CacheType._
42+
import shark.memstore2.{ColumnarSerDe, MemoryMetadataManager}
4143
import shark.memstore2.{TablePartition, TablePartitionStats}
4244
import shark.util.HiveUtils
4345

@@ -70,22 +72,25 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
7072

7173
@BeanProperty var tableDesc: TableDesc = _
7274

75+
// True if table data is stored the Spark heap.
7376
@BeanProperty var isInMemoryTableScan: Boolean = _
7477

78+
@BeanProperty var cacheMode: CacheType.CacheType = _
79+
7580

7681
override def initializeOnMaster() {
7782
// Create a local copy of the HiveConf that will be assigned job properties and, for disk reads,
7883
// broadcasted to slaves.
7984
localHConf = new HiveConf(super.hconf)
85+
cacheMode = CacheType.fromString(
86+
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
8087
isInMemoryTableScan = SharkEnv.memoryMetadataManager.containsTable(
8188
table.getDbName, table.getTableName)
8289
}
8390

8491
override def outputObjectInspector() = {
85-
val cacheMode = CacheType.fromString(
86-
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
8792
if (parts == null) {
88-
val serializer = if (CacheType.shouldCache(cacheMode)) {
93+
val serializer = if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) {
8994
new ColumnarSerDe
9095
} else {
9196
tableDesc.getDeserializerClass().newInstance()
@@ -94,7 +99,7 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
9499
serializer.getObjectInspector()
95100
} else {
96101
val partProps = firstConfPartDesc.getProperties()
97-
val partSerDe = if (CacheType.shouldCache(cacheMode)) {
102+
val partSerDe = if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) {
98103
new ColumnarSerDe
99104
} else {
100105
firstConfPartDesc.getDeserializerClass().newInstance()
@@ -115,8 +120,6 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
115120
// 1. Spark heap (block manager), accessed through the Shark MemoryMetadataManager
116121
// 2. Tachyon table
117122
// 3. Hive table on HDFS (or other Hadoop storage)
118-
val cacheMode = CacheType.fromString(
119-
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
120123
// TODO(harvey): Pruning Hive-partitioned, cached tables isn't supported yet.
121124
if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) {
122125
if (isInMemoryTableScan) {
@@ -147,8 +150,6 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
147150
// the input table and we have statistics on the table.
148151
val columnsUsed = new ColumnPruner(this, table).columnsUsed
149152

150-
val cacheMode = CacheType.fromString(
151-
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
152153
if (!table.isPartitioned && cacheMode == CacheType.TACHYON) {
153154
SharkEnv.tachyonUtil.pushDownColumnPruning(rdd, columnsUsed)
154155
}

src/test/scala/shark/SQLSuite.scala

+4
Original file line numberDiff line numberDiff line change
@@ -1042,6 +1042,10 @@ class SQLSuite extends FunSuite {
10421042
val cachedCount = cachedTableCounts(i)
10431043
assert(onDiskCount == cachedCount, """Num rows for %s differ across Shark metastore restart.
10441044
(rows cached = %s, rows on disk = %s)""".format(tableName, cachedCount, onDiskCount))
1045+
// Check that we're able to materialize a row - i.e., make sure that table scan operator
1046+
// doesn't try to use a ColumnarSerDe when scanning contents on disk (for our test tables,
1047+
// LazySimpleSerDes should be used).
1048+
sc.sql("select * from %s limit 1".format(tableName))
10451049
}
10461050
// Finally, reload all tables.
10471051
SharkRunner.loadTables()

0 commit comments

Comments
 (0)